Status

Current stateAccepted

Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq

JIRAhttps://issues.apache.org/jira/browse/KAFKA-16092

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka has achieved great success as a highly scalable event-streaming platform. The way that consumer groups assign partitions to members of the group gives a powerful combination of ordering and scalability, but it does introduce coupling between the number of consumers in a consumer group and the number of partitions. Users of Kafka often have to “over-partition” simply to ensure they can have sufficient parallel consumption to cope with peak loads.

There are plenty of situations in which consumers could cooperatively consume from a stream of events without needing to be assigned exclusive access to specific topic-partitions. This, together with per-message acknowledgement and delivery counts, enables a class of use-cases traditionally built around the concept of a queue. For example, a queue is perfect for a situation in which messages are independent work items that can be processed concurrently by a pool of applications, and individually retried or acknowledged as processing completes. This is much easier to achieve using a queue rather than a partitioned topic with a consumer group.

This KIP introduces the concept of a share group as a way of enabling cooperative consumption using Kafka topics. It does not add the concept of a “queue” to Kafka per se, but rather that introduces cooperative consumption to accommodate these queuing use-cases using regular Kafka topics. Share groups make this possible. You can think of a share group as roughly equivalent to a “durable shared subscription” in existing systems.

This is indeed Queues for Kafka - queues done in a Kafka way, with no maximum queue depth and the ability to reset to a specific time for point-in-time recovery.

Proposed Changes

Share groups allow Kafka consumers to work together cooperatively consuming and processing the records from topics. They are an alternative to consumer groups for situations in which finer-grained sharing is required.

The fundamental differences between a share group and a consumer group are:

  • The consumers in a share group cooperatively consume records with partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions in a topic

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

Share groups are a new type of group, alongside the existing consumer groups, adding "share"  to the existing group types of "consumer"  and "classic" .

All consumers in the same share group subscribed to the same topic cooperatively consume the records of that topic. If a topic is accessed by consumers in more than one share group, each share group cooperatively consumes from that topic independently of the other share groups.

Each consumer can dynamically set the list of topics it wants to subscribe to. In practice, all of the consumers in a share group will usually subscribe to the same topic or topics.

When a consumer in a share-group fetches records, it receives available records from any of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, the lock duration is 30s, but it can also be controlled using the group group.share.record.lock.duration.ms configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in the following ways:

  • The consumer can acknowledge successful processing of the record

  • The consumer can release the record, which makes the record available for another delivery attempt

  • The consumer can reject the record, which indicates that the record is unprocessable and does not make the record available for another delivery attempt

  • The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.partition.max.record.locks . By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

Concepts

There are some concepts being introduced to Kafka to support share groups.

The group coordinator is now responsible for coordination of share groups as well as consumer groups. The responsibility for being coordinator for the cluster’s share groups is distributed among the brokers, exactly as for consumer groups. For share groups, the group coordinator has the following responsibilities:

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. This KIP defines just one assignor, org.apache.kafka.coordinator.group.share.SimpleAssignor, as described below.

A share-partition is a topic-partition with a subscription in a share group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

A share-partition leader is a component of the broker which manages the share-group’s view of a topic-partition. It is co-located with the topic-partition leader, and the leadership of a share-partition follows the leadership of the topic-partition. The share-partition leader has the following responsibilities:

  • It fetches the records from the replica manager from the local replica

  • It manages and persists the states of the in-flight records, using the share-group state persister

This means that the fetch-from-follower optimization is not supported by share-groups. The KIP does however include rack information so that consumers could preferentially fetch from share-partitions whose leadership is in the same rack, once a rack-aware assignor is available.

The share coordinator is responsible for persistence of share-group state on a new internal topic. The share-partition leader uses the share-group state persister to communicate with the share coordinator.

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Relationship with consumer groups

Consumer groups and share groups exist in the same namespace in a Kafka cluster. As a result, if there’s a consumer group with a particular name, you cannot create a share group with the same name, and vice versa. But consumer groups and share groups are quite different in terms of use, so attempts to perform operations for one kind of group on a group of the incorrect type will fail with a GroupIdNotFoundException .

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group ID was first used. For production use, choosing a naming convention for groups in advance and using this configuration to enforce the group type is recommended.

A future KIP is planned for the administration of groups, encompassing aspects such as administrative creation of groups, and listing groups of all types including consumer groups, share groups and others such as groups used by Kafka Connect.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implements the org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface. There is just one implementation so far, org.apache.kafka.coordinator.group.share.SimpleAssignor .

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

For a share group, the group coordinator does not need to persist the assignments, but it does need to persist the assignment epoch so that it doesn't move backwards if the group coordinator changes.

The reconciliation process for a share group is very simple because there is no fencing - the group coordinator revokes the partitions which are no longer in the target assignment of the member and assigns the new partitions to the member at the same time. There’s no need for the revocations to be acknowledged before new partitions are assigned. The member acknowledges changes to its assignment, but the group coordinator does not depend upon receiving the acknowledgement to proceed.

Data model

This is the data model maintained by the group coordinator for share groups.

Share Group and Member

The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.

Share Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Server Assignor

string

The server-side assignor used by the group.
Members

[]Member

The set of members in the group.
Partitions Metadata

[]PartitionMetadata

The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
Name

Type

Description
Member ID

string

The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member.
Rack ID

string

The rack ID configured by the consumer.
Client ID

string

The client ID configured by the consumer.
Client Host

string

The client host of the consumer.
Subscribed Topic Names

[]string

The current set of subscribed topic names configured by the consumer.

Target Assignment

The target assignment of the group. This represents the assignment that all the members of the group will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.

Target Assignment
NameTypeDescription
Group ID stringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Errorint8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member.
Partitions[]TopicIdPartitionThe set of partitions assigned to this member.

Current Assignment

The current assignment of a member.

Current Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDstringThe member ID of this member.
Member Epochint32The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member.
Errorint8The error reported by the assignor.
Partitions[]TopicIdPartitionThe current partitions used by the member.

Rebalance process

The rebalance process is driven by the group coordinator and revolves around three kinds of epochs: the group epoch, the assignment epoch and the member epoch. This is intentionally very similar to how the process works for consumer groups in KIP-848.

Group epoch - Trigger a rebalance

The group coordinator is responsible for triggering a rebalance of the group when the metadata of the group changes in some cases. The group epoch represents the generation of the group metadata. It is incremented whenever the group metadata is updated. This happens in the following cases:

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member is removed from the group by the group coordinator.
  • The partition metadata is updated, such as when a new partition is added or a topic is created or deleted, or when the rack ID changes.
  • AdminClient.alterShareGroupOffsets  is used to set the SPSO.

In all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. The new version of the group metadata signals that a new assignment is required for the group.

Assignment epoch - Compute the group assignment

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

Member epoch - Reconciliation of the group

Each member independently reconciles its current assignment with its new target assignment, converging with the target epoch and assignment.

The group coordinator revokes the partitions which are no longer in the target assignment of the member, and assigns the partitions which have been added to the target assignment of the member. It provides the new assignment to the member in its heartbeat response until the member acknowledges the assignment change in a heartbeat request.

By assigning and revoking partitions for the members of the group, the group coordinator can balance the partitions across the members of the group.

Member ID

Every member is uniquely identified by the UUID called the member ID. This UUID is generated by the group coordinator and given to the member when it joins the group. It is used in all communication with the group coordinator and must be kept during the entire lifespan of the member.

Heartbeat and session

The member uses the new ShareGroupHeartbeat API to establish a session with the group coordinator. The member is expected to heartbeat every group.share.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.share.session.timeout.ms, the group coordinator will remove the member from the group. The member is told the heartbeat interval in the response to the ShareGroupHeartbeat API.

If a member is removed from the group because it fails to heartbeat, because there’s intentionally no fencing, at the protocol level, the consumer does not lose the ability to fetch and acknowledge records. A failure to heartbeat is most likely because the consumer has died. If the consumer just failed to heartbeat due to a temporary pause, it could in theory continue to fetch and acknowledge records. When it finally sends a heartbeat and realises it’s been kicked out of the group, it should stop fetching records because its assignment has been revoked, and rejoin the group.

Static membership

Share groups do not support static membership. Because the membership of a share group is more ephemeral, there’s less advantage to maintaining an assignment when a member has temporarily left but will rejoin within the session timeout.

Share group states

Share groups do not have the ASSIGNING state because only server-side assignors are supported, and do not need the RECONCILING state because there’s no need for all members to converge before the group enters the STABLE state. There is no automatic expiration of share groups.

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it. This only happens if the group does not have any persistent share-group state. Share groups are intentionally more durable than consumer groups.

Persistence and fail-over

For a share group, the group coordinator persists seven kinds of records:

  • ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members and their assignments. It will bump the group epoch as a result.

The SimpleAssignor

This KIP introduces org.apache.kafka.coordinator.group.share.SimpleAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records. When calculating a new target assignment, the assignor is aware of the current assignment (assuming the group coordinator hasn't changed) and can use this to influence the calculation of the new target assignment. The calculation proceeds as follows:

  1. The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  2. If any partitions were not assigned any members by (1) and do not have members already assigned in the current assignment, members are assigned round-robin until each partition has at least one member assigned to it.

  3. If any partitions were assigned members by (1) and also have members in the current assignment assigned by (2), the members assigned by (2) are removed.

When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

Here an example of how the assignments might change as members join the share group and the number of partitions changes.

State

Members subscribed to T1

Topic:partitions

Assignment change

Assignments

M1 subscribed to topic T1 with 1 partition

M1 (hash 1)

T1:0

Assign T1:0 to M1

M1 → T1:0

M2 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2)

T1:0

Assign T1:0 to M2

M1 → T1:0

M2 → T1:0

Add 3 partitions to T1

M1 (hash 1), M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:1 and T1:3 to M2

Revoke T1:0 from M2

Assign T1:2 to M1

M1 → T1:0, T1:2

M2 → T1:1, T1:3

M3 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3)

T1:0, T1:1, T1:2, T1:3

Assign T1:2 to M3

Revoke T1:2 from M1

M1 → T1:0

M2 → T1:1, T1:3

M3 → T1:2

M4 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4)

T1:0, T1:1, T1:2, T1:3

Assign T1:3 to M4

Revoke T1:3 from M2

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5, M6, M7, M8 join the group and subscribe to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

Assign T1:0 to M5

Assign T1:1 to M6

Assign T1:2 to M7

Assign T1:3 to M8

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

M1 subscribes to T2 with 2 partitions

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

T2:0, T2:1

Assign T2:0 and T2:1 to M1

M1 → T1:0, T2:0, T2:1

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

All members leave except M2

M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:0, T1:2, T1:3 to M2

M2 → T1:0, T1:1, T1:2, T1:3

SimpleAssignor does not make assignments based on rack IDs. SimpleAssignor does not make assignments based on lag or consumer throughput.

Record delivery and acknowledgement

This section describes how records are delivered to consumers and how the consumers acknowledge whether delivery was successful.

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SPEO). The records between starting at the SPSO and up to the SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

The SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SPSO and the SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SPSO and the SPEO. The upper bound is controlled by the broker configuration group.share.partition.max.record.locks. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

The records in a share-partition are in one of four states:

State

Description

Available

The record is available for a consumer

Acquired

The record has been acquired for a specific consumer, with a time-limited acquisition lock

Acknowledged

The record has been processed and acknowledged by a consumer

Archived

The record is not available for a consumer

All records before the SPSO are in Archived state. All records after the SPEO are in Available state, but not yet being delivered to consumers.

The records also have a delivery count in order to prevent unprocessable records being endlessly delivered to consumers. If a record is repeatedly causing exceptions during its processing, it is likely that it is a “poison message”, perhaps with a formatting or semantic error. Every time that a record is acquired by a consumer in a share group, its delivery count increments by 1. The first time a record is acquired, its delivery count is 1.

The state transitions look like this:

+--------------+
|  Available   |<------------------+
+--------------+                   |
       |                           |
       | acquired for consumer     | - if (delivery count < group.share.delivery.attempt.limit)
       | (delivery count++)        |     - released by consumer
       |                           |     - acquisition lock elapsed
       V                           |
+--------------+                   |
|   Acquired   |-------------------+
+--------------+                   |
       |                           |
       |                           | - if (delivery count == group.share.delivery.attempt.limit)
       | accepted by consumer      |     - released by consumer
       |                           |     - acquisition lock elapsed
       V                           | OR
+--------------+                   | - rejected by consumer as unprocessable
| Acknowledged |                   |
+--------------+                   |
       |                           |
       |                           |
       | SPSO moves past record    |
       |                           |
       V                           |
+--------------+                   |
|   Archived   |<------------------+
+--------------+

The share-partition leader persists the states and delivery counts. These updates are not performed with exactly-once semantics, so the delivery count cannot be relied upon to be precise in all situations. It is intended as a way to protect against poison messages, rather than a precise count of the number of times a record is delivered to a consumer.

When records are fetched for a consumer, the share-partition leader starts at the SPSO and finds Available records. For each record it finds, it moves it into Acquired state, bumps its delivery count and adds it to a batch of acquired records to return to the consumer. The consumer then processes the records and acknowledges their consumption. The delivery attempt completes successfully and the records move into Acknowledged state.

Alternatively, if the consumer cannot process a record or its acquisition lock elapses, the delivery attempt completes unsuccessfully and the record’s next state depends on the delivery count. If the delivery count has reached the cluster’s share delivery attempt limit (5 by default), the record moves into Archived state and is not eligible for additional delivery attempts. If the delivery count has not reached the limit, the record moves back into Available state and can be delivered again.

This means that the delivery behavior is at-least-once.

Ordering

Share groups focus primarily on sharing to allow consumers to be scaled independently of partitions. The records in a share-partition can be delivered out of order to a consumer, in particular when redeliveries occur.

For example, imagine two consumers in a share group consuming from a single-partition topic. The first consumer fetches records 100 to 109 inclusive and then crashes. At the same time, the second consumer fetches, processes and acknowledges records 110 to 119. When the second consumer fetches again, it gets records 100 to 109 with their delivery counts set to 2 because they are being redelivered. That’s exactly what you want, but the offsets do not necessarily increase monotonically in the same way as they do for a consumer group.

The records returned in a batch for particular share-partition are guaranteed to be in order of increasing offset. There are no guarantees about the ordering of offsets between different batches.

Managing the SPSO and SPEO

The consumer group concepts of seeking and position do not apply to share groups. The SPSO for each share-partition can be initialized for an empty share group and the SPEO naturally moves forwards as records are consumed.

When a topic subscription is added to a share group for the first time, the SPSO is initialized for each share-partition. By default, the SPSO for each share-partition is initialized to the latest offset for the corresponding topic-partitions.

Alternatively, there is an administrative action available using either AdminClient.alterShareGroupOffsets or the kafka-share-groups.sh tool to reset the SPSO for an empty share group with no active members. This can be used to “reset” a share group to the start of a topic, a particular timestamp or the end of a topic. It can also be used to initialize the share group to the start of a topic. Resetting the SPSO discards all of the in-flight record state and delivery counts.

For example, to start using a share group S1 to consume for the first time from the earliest offset of a topic T1, you could use:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-earliest --execute

If the number of partitions is increased for a topic with a subscription in a share group, the SPSO for the newly created share-partitions is initialized to 0 (which is of course both the earliest and latest offset for an empty topic-partition). This means there is no doubt about what happens when the number of partitions is increased.

If the SPSO is reset to an offset that has been tiered to remote storage (KIP-405: Kafka Tiered Storage), there will be a performance impact just as for existing consumers fetching records from remote storage.

Log retention

The SPSO for each share-partition is bounded by the log start offset (LSO) of the topic-partition, which is itself managed by the retention policy.

If log segments are being retained based on time, when an inactive log segment's age exceeds the configured time, the LSO advances to the start of the next log segment and the old segment is deleted. If the SPSO is within the log segment that is deleted, it will also advance to the next log segment. This is roughly equivalent to message-based expiration in other messaging systems.

If log segments are being retained based on size, when the log exceeds the configured size, the LSO advances to the start of the next log segment and the old segment is deleted. If the SPSO is within the log segment that is deleted, it will also advance to the next log segment. This keeps control of the space consumed by the log, but it does potentially silently remove records that were eligible for delivery. When using share groups with log retention based on size, it is important to bear this in mind.

When the SPSO advances because of the LSO moving, the in-flight records past which the SPSO moves logically move into Archived state. The exception is that records which are already Acquired for delivery to consumers can be acknowledged with any AcknowledgeType, at which point they logically transition into Archived state too; there's no need to throw an exception for a consumer which has just processed a record which is about to become Archived.

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.

Log compaction

When share groups are consuming from compacted topics, there is the possibility that in-flight records are cleaned while being consumed. In this case, the delivery flow for these records continues as normal because the disappearance of the cleaned records will only be discovered when they are next fetched from the log. This is analogous to a consumer group reading from a compacted topic - records which have been fetched by the consumer can continue to be processed, but if the consumer tried to fetch them again, it would discover they were no longer there.

When fetching records from a compacted topic, it is possible that record batches fetched have offset gaps which correspond to records the log cleaner removed. This simple results in gaps of the range of offsets of the in-flight records.

In-flight records example

An example of a share-partition showing the states looks like this:

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |   8   |   9   |  ...  | <- offset
| Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state
|       |       |   1   |   2   |   1   |       |       |       |       |       |       | <- delivery count
+-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+
                    |                                                       |
                    +-- Share-partition start offset (SPSO)                 +-- Share-partition end offset (SPEO) 

The share group is currently managing the consumption of the in-flight records, which have offsets 2 to 8 inclusive.

  • All records earlier than offset 2 are in Archived state and are not in-flight

  • Records 2 and 4 have been acquired for consumption by a consumer, and their delivery counts have been incremented to 1

  • Record 3 has previously been acquired twice for consumption by a consumer, but went back into Available state

  • Record 5 has been acknowledged

  • Record 6 has previously been acquired for consumption by a consumer, was rejected because it cannot be processed, and is in Archived state

  • Records 7 and 8 are available for consumption by a consumer

  • All records starting with offset 9 and later are in Available state

The cluster records this information durably. In this example, the durable state contains the SPSO position, the non-zero delivery count for offset 3, the Acknowledged state of offset 5, and the Archived state of offset 6.

Batching

Cooperative consumption is inherently record-based, but the expectation is that batching is used to maximise performance. For example, in the case where all records in a batch are processed successfully:

  • When a consumer fetches records, the share-partition leader prefers to return complete record batches.

  • In the usual and optimal case, all of the records in a batch will be in Available state and can all be moved to Acquired state with the same acquisition lock time-out.

  • When the consumer has processed the fetched records, it can acknowledge delivery of all of the records as a single batch, transitioning them all into Acknowledged state.

So, when a bunch of consumers are cooperatively consumed from a topic using a share group, the natural unit of sharing is the record batch. The processing loop is roughly:

  • Fetch record batch

  • Process records

  • Acknowledge all records in batch

In the situation where some records in a batch have been released or rejected separately, subsequent fetches of those records are more likely to have gaps.

Fetching and acknowledging records

Share groups introduce two new APIs in the Kafka protocol for fetching and acknowledging records.

  • ShareFetch  for fetching records from share-partition leaders, and optionally acknowledging delivery
  • ShareAcknowledge  for acknowledging delivery with share-partition leaders

In order to ensure no share-partitions are starved from records being fetched, the share-partition leader rotates the order of share-partitions for which it returns partition information. This ensures that it eventually returns data about all partitions for which data is available.

When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker sends unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareFetch or ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

By iterating over the record batches but not iterating over the individual records within, the share-partition leader is able to understand the log without having to decompress the records. There is one exception to this and that is to do with reading the transaction end markers as described in the next section.

The share-partition leader does not maintain an explicit cache of records that it has fetched. As a result, it may occasionally have to re-fetch records for redelivery, which is an unusual case.

Reading transactional records

Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. In a consumer group, a consumer using read_committed isolation level is only able to fetch records up to the last stable offset (LSO). It is also responsible for filtering out transactional records which were aborted. This filtering happens in the client.

For a share group, the concept of isolation level applies to the entire group, not each consumer. The isolation level of a share group is controlled by the group configuration group.share.isolation.level.

For the read_uncommitted isolation level, which is the default, the share group consumes all non-transactional and transactional records. The SPEO is bounded by the high-water mark.

For the read_committed isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level. The share-partition leader itself is responsible for filtering out transactional records which have been aborted. This can be done looking at the headers of the record batches without having to iterate of the individual records.

The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED . This means that the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed. The replica manager also returns a list of aborted transactions relevant to the records which are fetched, and the share-partition leader uses this to filter out records for transactions which aborted.

When a RecordBatch  of non-transactional records is fetched, the records are immediately added to the set of in-flight records. When a RecordBatch  of transactional records is fetched, the share-partition leader compares the batch's producer ID and offsets against the list of aborted transactions. If the transaction did not abort, the records are added to the set of in-flight records.

Share sessions

The ShareFetch API works very much like incremental fetch using a concept called a share session. Each share session contains a set of topic-partitions which are managed in the share-partition leaders. The share-partition leader manages the fetching of records and the in-flight record state for its share-partitions. The consumer adds and removes topic-partitions from its share session using the ShareFetch API just like the Fetch API is used for incremental fetch. With the Fetch API, the consumer specifies the fetch offset. With the ShareFetch API, the consumer just fetches records and the share-partition leader decides which records to return.

A share session encapsulates the state of a share group member fetching and acknowledging records. Some of the concepts are reminiscent of fetch sessions introduced in KIP-227, but they are not identical. A share session consists of:

  • The group ID of the share group

  • The member ID for the consumer (UUID), assigned by the group coordinator when the consumer joins the share group

  • The 32-bit session epoch, monotonically incrementing starting at 1 and wrapping after reached Integer.MAX_VALUE

  • The set of share-partitions which are being fetched

  • The acquisition locks for records being delivered to the consumer

  • The time when the share session was last used

The share session provides a way to maintain context for a consumer in a share group across a sequence of ShareFetch  and ShareAcknowledge  requests. A share session is created using a ShareFetch  request, then used by ShareFetch  and ShareAcknowledge  requests, and finally closed by either a ShareFetch  or a ShareAcknowledge  request. When the connection between a client and broker is disconnected, any share session is automatically closed.

Share sessions use memory on the share-partition leader. Each broker that is a share-partition leader has a cache of share sessions. Because a share session is an integral part of how share groups work, as opposed to a performance optimisation in the manner of fetch sessions, the limit is calculated by multiplying the share group configurations group.share.max.groups  and group.share.max.size . Sessions are evicted if they have been inactive for more than 120,000 milliseconds (2 minutes). The key of the cache is the pair (GroupId, MemberId).

The sessions are handled as follows:

RPC request

Request ShareSessionEpoch

Meaning

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

0

This is a full fetch request. It contains the complete set of topic-partitions to fetch. It cannot contains any acknowledgements.

If the request contains acknowledgements, fails with error code INVALID_REQUEST .

If the session is in the cache, discard the existing session releasing all acquired records.

Create a new session in the cache with epoch 1.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

This is an incremental fetch request. It contains a partial set of topic-partitions to be applied to the set already in the cache. It can contain a set of acknowledgements to perform before returning the fetched data.

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, update the set of topic-partitions in the cache, increment the epoch in the cache, process the acknowledgements, and fetch records from the replica manager.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

-1

This is a final fetch request. It can contain a final set of acknowledgements, but its primary purpose is to close the share session.

If the request contains a list of topics to add or forget, fails with error code INVALID_REQUEST .

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements, release any remaining acquired records for the share session, and remove the share session from the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

0

Fails with error code INVALID_SHARE_SESSION_EPOCH . It’s not permitted to create a share session with this request.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and increment the epoch in the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

-1

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache, process the acknowledgements, release and remaining acquired records for the share session, and remove the share session from the cache.

Client programming interface

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interfaces, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may call the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

Each call to KafkaShareConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement. KafkaShareConsumer  works out which style of acknowledgement is being used by the order of calls the application makes. It is not permissible to mix the two styles of acknowledgement.

If the application calls the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

If the application does not call KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaShareConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaShareConsumer.close() which releases any acquired records without acknowledgement.

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.InvalidRecordStateException, but the record may well be re-acquired for the same consumer and returned to it again.

Handling bad records

There are several ways in which a "bad" record might manifest and each is handled in a different way.

If a record was delivered to a consumer normally but it could not be processed, the consumer can use the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method to release or reject the record, depending on whether it is treated as a transient or permanent error.

If a record was delivered to a consumer but could not be deserialized, the KafkaShareConsumer.poll(Duration) method throws an org.apache.kafka.common.errors.RecordDeserializationException  which contains the record's partition and offset information. The KafkaShareConsumer  automatically releases the record, although the application can override this if it is using explicit acknowledgement. The next call to KafkaShareConsumer.poll(Duration)  skips over the bad record so progress continues to be made.

If the consumer is configured to check CRCs (which is the default using the check.crcs  configuration property), the KafkaShareConsumer.poll(Duration)  throws an org.apache.kafka.common.errors.CorruptRecordException  The KafkaShareConsumer  automatically rejects the entire record batch, meaning that the record batch is not redelivered. The next call to KafkaShareConsumer.poll(Duration)  skips over the bad record batch so progress continues to be made.

Acknowledgement commit callback

Acknowledgements errors are delivered to a new kind of callback called an acknowledgement commit callback which can optionally be registered with a KafkaShareConsumer .

  • If the application uses KafkaShareConsumer.commitSync() to commit its acknowledgements, the results of the acknowledgements is returned to the application

  • If the application uses KafkaShareConsumer.commitAsync()  or KafkaShareConsumer.poll(Duration) to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledgement commit callback registered.

The acknowledgement commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer  with the exception of KafkaShareConsumer.wakeup() . It can be called during any of KafkaShareConsumer.poll() , KafkaShareConsumer.commitAsync() , KafkaShareConsumer.commitSync()  and KafkaShareConsumer.close() .

Example - Acknowledging a batch of records (implicit acknowledgement)

In this example, a consumer using share group "myshare" subscribes to topic "foo". It processes all of the records in the batch and then calls KafkaShareConsumer.commitSync() which implicitly marks all of the records in the batch as successfully consumed and commits the acknowledgement synchronously with Kafka. Asynchronous commit would also be acceptable.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Return a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        doProcessing(record);
    }
    consumer.commitSync();                                                              // Commit the acknowledgement of all the records in the batch
}

Behind the scenes, the KafkaShareConsumer fetches records from the share-partition leader. The leader selects the records in Available state, and returns complete record batches (https://kafka.apache.org/documentation/#recordbatch). It moves the records into Acquired state, increments the delivery count, starts the acquisition lock timeout, and returns them to the KafkaShareConsumer . Then the KafkaShareConsumer keeps a map of the state of the records it has fetched and returns a batch to the application.

When the application calls KafkaShareConsumer.commitSync(), the KafkaConsumer updates the state map by marking all of the records in the batch as Acknowledged and it then commits the acknowledgements by sending the new state information to the share-partition leader. For each share-partition, the share-partition leader updates the record states atomically.

A more efficient way of operating is to let the acknowledgement of delivery be performed as part of KafkaShareConsumer.poll(). Each poll acknowledges the batch of records acquired by the previous poll, and it allows the fetching and acknowledgements to be combined.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Return a batch of acquired records and commit the acknowledgement of records in the previous batch
    for (ConsumerRecord<String, String> record : records) {
        doProcessing(record);
    }
}

Example - Per-record acknowledgement (explicit acknowledgement)

In this example, the application uses the result of processing the records to acknowledge or reject the records in the batch.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Return a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark the record as unprocessable
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of all the records in the batch
}

In this example, each record processed is separately acknowledged using a call to the new KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method. The AcknowledgeType argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, the AcknowledgeType.RELEASE is more appropriate because the record remains eligible for further delivery attempts.

The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) are simply updating the state map in the KafkaConsumer. It is only once KafkaShareConsumer.commitAsync() is called that the acknowledgements are committed by sending the new state information to the share-partition leader.

Example - Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)

In this example, the application stops processing the batch when it encounters an exception.

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myshare");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("foo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));    // Return a batch of acquired records
    for (ConsumerRecord<String, String> record : records) {
        try {
            doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);                       // Mark the record as processed successfully
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);                       // Mark this record as unprocessable
            break;
        }
    }
    consumer.commitAsync();                                                             // Commit the acknowledgements of the acknowledged records only
}

There are the following cases in this example:

  1. The batch contains no records, in which case the application just polls again. The call to KafkaShareConsumer.commitAsync() just does nothing because the batch was empty.

  2. All of the records in the batch are processed successfully. The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.ACCEPT) mark all records in the batch as successfully processed.

  3. One of the records encounters an exception. The call to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType.REJECT) rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to KafkaShareConsumer.commitAsync() commits the acknowledgements, but the records after the failed record remain Acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

Access control

Share group access control is performed on the GROUP resource type, just the same as consumer groups, with the same rules for the actions checked. A share group is just a new kind of group.

  • Operations which read information about a share group need permission to perform the DESCRIBE action on the named group resource

  • Operations which change information about a share group (such as consuming a record) need permission to perform the READ action on the named group resource

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.

A table containing the ACLs for the new RPCs can be found in the section on changes to the Kafka protocol below.

Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged and the frequency with which it is logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt, and this requires just one update to the durable share-partition state. Indeed, if there’s only a single consumer for a share-partition, only the SPSO needs to be logged as it moves along the topic. That’s of course an unlikely situation given these are share groups, but it illustrates the point.

Share-group state persister interface

The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface org.apache.kafka.server.group.share.Persister . For now, it is not pluggable, and Kafka includes a built-in default implementation org.apache.kafka.server.group.share.DefaultStatePersister . This section describes the behavior of this built-in persister.

When records are acknowledged using the ShareFetch or ShareAcknowledge RPCs, durable state will need to be written by the persister. The share-partition leader serving these requests will wait for the persister to complete writing the durable state before responding.

The share coordinator and the share-group state topic

The share coordinator is responsible for persistence of share-group state on a new internal topic called "__share_group_state" . The responsibility for being a share coordinator is distributed across the brokers in a cluster. This model has similarities with the group coordinator and the transaction coordinator. Indeed, the share coordinator is built using the coordinator runtime, like the new group coordinator. The share-group state topic is highly partitioned (50 by default), but it is not compacted. It uses the clean-up policy of "delete" with unlimited retention ("retention.ms=-1"). The share coordinator manages the space used by pruning records periodically as described later. The responsibility for being a share coordinator is controlled in the same way as a group coordinator; the broker which leads a partition of the share-group state topic is the coordinator for the share-partitions whose records belong on that partition.

The existing group coordinator is responsible for the management and assignment for share groups. While it would also be possible to incorporate the responsibility to persist share-group state into the existing group coordinator and write this information onto the consumer offsets topic, that topic is already a known bottleneck for some users and adding to the problem does not seem prudent.

The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState , ReadShareGroupState , WriteShareGroupState  and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type FindCoordinatorRequest.CoordinatorType.SHARE  (2) and a key consisting of "group:topicId:partition".

When a share coordinator becomes leader of a partition of the share-group state topic, it scans the partition to replay the records it finds and build its in-memory state. Once complete, it is ready to serve the share-group state RPCs.

The records on the share-group state topic are keyed by (group, topicId, partition). This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.

Share-group state records

The share coordinator writes two types of record: ShareSnapshot and ShareUpdate. The ShareSnapshot record contains a complete snapshot of the durable state for a share-partition. Repeatedly re-serializing and writing the entire state introduces a performance bottleneck. The ShareUpdate record contains a partial update to the state. So the current durable state of a share-partition consists of the latest ShareSnapshot and zero or more ShareUpdate records following it.

The records also include a snapshot epoch. Each time a ShareSnapshot is written for a share-partition, the snapshot epoch for that share-partition is increased by 1. Each time a ShareUpdate is written for a share-partition, it uses the snapshot epoch of the latest ShareSnapshot.

The share coordinator will prefer to write a snapshot over an update (for example, when the SPSO moves and there are no in-flight records, the snapshot will be small and there’s no need to write an update instead). The share coordinator will take a snapshot periodically, frequently enough to minimise the number of ShareUpdate records to replay but rarely enough to minimise the performance cost of taking snapshots.

There are two kinds of fencing for share-group state.

The records include a state epoch. This is used to ensure that all of the components involved are aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records also include a leader epoch. Whenever the share-partition leader calls the share coordinator, it provides the leader epoch of the partition in the request. The share coordinator uses this to fence zombie share-partition leaders. When a new leader is elected for a share-partition, the leader epoch of the partition is incremented. This means that the new leader will use a higher leader epoch in its requests to the share coordinator, and any trailing requests from earlier share-partition leaders can be rejected with FENCED_LEADER_EPOCH . The leader epoch is persisted by the share coordinator in its ShareSnapshot and ShareUpdate records. When a new state epoch is used, the leader epoch is initialized to -1 , and it is properly initialized when a share-partition leader makes a request.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareSnapshot

Version: 0
GroupId: string
TopicId: uuid
Partition: int32
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  FirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16

ShareUpdate

Version: 1
GroupId: string
TopicId: uuid
Partition: int32
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  FirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16

When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareSnapshot and any subsequent ShareUpdate records whose snapshot epoch matches. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

Managing the space of the share-group state topic

The share-group state data is not very amenable to log compaction. As a result, the share coordinator uses unlimited log retention and prunes the log records itself using ReplicaManager.deleteRecords. The share coordinator can delete all records before the latest ShareSnapshot for all active share-partitions. By taking periodic snapshots, the latest ShareSnapshot is replaced. For idle share-partitions, the share coordinator will periodically write a new ShareSnapshot so the older records can be pruned.

Using the share coordinator from the group coordinator

The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

The state is initialized in the following cases:

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

In order to add a topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the topic added.

In order to add partitions to a known topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.

In order to remove a topic, the group coordinator:

  • If the topic still exists, it writes a ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the topic removed.

Using the share coordinator from the share-partition leader

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupStateSummary RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

Here are some examples showing the writing of share-group state.

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 119,
      "LastOffset": 119,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 111,
      "LastOffset": 112,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 113,
      "LastOffset": 118,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


Administration

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupStatePartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

Client API changes

KafkaShareConsumer

This KIP introduces a new interface for consuming records from a share group called org.apache.kafka.clients.consumer.ShareConsumer  with an implementation called org.apache.kafka.clients.consumer.KafkaShareConsumer . The interface stability is Evolving .

@InterfaceStability.Evolving
public interface ShareConsumer<K, V> {

    /**
     * Get the current subscription. Will return the same topics used in the most recent call to
     * {@link #subscribe(Collection)}, or an empty set if no such call has been made.
     *
     * @return The set of topics currently subscribed to
     */
    Set<String> subscription();

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
     * group and will trigger a rebalance operation if any one of the following events are triggered:
     * <ul>
     * <li>A member joins or leaves the share group
     * <li>An existing member of the share group is shut down or fails
     * <li>The number of partitions changes for any of the subscribed topics
     * <li>A subscribed topic is created or deleted
     * </ul>
     *
     * @param topics The list of topics to subscribe to
     *
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws KafkaException for any other unrecoverable errors
     */
    void subscribe(Collection<String> topics);

    /**
     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void unsubscribe();

    /**
     * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
     * subscribed to any topics before polling for data.
     *
     * <p>
     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned.
     *
     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
     *
     * @return map of topic to records since the last fetch for the subscribed list of topics
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws InterruptException if the calling thread is interrupted before or while this method is called
     * @throws InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
     * @throws KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs,
     *             or any new error cases in future versions)
     * @throws IllegalArgumentException if the timeout value is negative
     * @throws IllegalStateException if the consumer is not subscribed to any topics
     * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     */
    ConsumerRecords<K, V> poll(Duration timeout);

    /**
     * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
     * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
     * {@link #poll(Duration)} call.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * @param record The record to acknowledge
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record);

    /**
     * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
     * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
     * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}.
     *
     * @param record The record to acknowledge
     * @param type The acknowledge type which indicates whether it was processed successfully
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync();

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @param timeout The maximum amount of time to await completion of the acknowledgement
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void commitAsync();

    /**
     * Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.
     *
     * @param callback The acknowledgement commit callback
     */
    void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback);

    /**
     * Determines the client's unique client instance ID used for telemetry. This ID is unique to
     * this specific client instance and will not change after it is initially generated.
     * The ID is useful for correlating client operations with telemetry sent to the broker and
     * to its eventual monitoring destinations.
     * <p>
     * If telemetry is enabled, this will first require a connection to the cluster to generate
     * the unique client instance ID. This method waits up to {@code timeout} for the consumer
     * client to complete the request.
     * <p>
     * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
     * configuration option.
     *
     * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
     *                The value must be non-negative. Specifying a timeout of zero means do not
     *                wait for the initial request to complete if it hasn't already.
     *
     * @return The client's assigned instance id used for metrics collection.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws IllegalStateException If telemetry is not enabled because config `{@code enable.metrics.push}`
     *                               is set to `{@code false}`.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException If an unexpected error occurs while trying to determine the client
     *                        instance ID, though this error does not necessarily imply the
     *                        consumer client is otherwise unusable.
     */
    Uuid clientInstanceId(Duration timeout);

    /**
     * Get the metrics kept by the consumer
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
     * This will commit acknowledgements if possible within the default timeout.
     * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
     *
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close();

    /**
     * Tries to close the consumer cleanly within the specified timeout. This method waits up to
     * {@code timeout} for the consumer to complete acknowledgements and leave the group.
     * If the consumer is unable to complete acknowledgements and gracefully leave the group
     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
     * used to interrupt close.
     *
     * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
     *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close(Duration timeout);

    /**
     * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
     * The thread which is blocking in an operation will throw {@link WakeupException}.
     * If no thread is blocking in a method which can throw {@link WakeupException},
     * the next call to such a method will raise it instead.
     */
   void wakeup();
}

The following constructors are provided for KafkaShareConsumer .

Method signatureDescription
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor

AcknowledgementCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgementCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to call the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

Method signatureDescription
void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception) 

A callback method the user can implement to provide asynchronous handling of acknowledgement completion.

Parameters:

offsets - A map of the offsets that this callback applies to.

exception - The exception thrown during processing of the request, or null if the acknowledgement completed successfully.

Exceptions:

InvalidRecordStateException - if the record state prevented the acknowledgement from succeeding.

WakeupException - if KafkaShareConsumer.wakeup() is called.

InterruptException - if the calling thread is interrupted.

AuthorizationException - if not authorized to the topic or group.

KafkaException - for any other unrecoverable errors.

ConsumerRecord

Add the following method on the org.apache.kafka.client.consumer.ConsumerRecord  class.

Method signatureDescription

Optional<Short> deliveryCount() 

Get the delivery count for the record if available.

The delivery count is available for records delivered using a share group and Optional.empty() otherwise.

A new constructor is also added:

   /**
     * Creates a record to be received from a specified topic and partition
     *
     * @param topic The topic this record is received from
     * @param partition The partition of the topic this record is received from
     * @param offset The offset of this record in the corresponding Kafka partition
     * @param timestamp The timestamp of the record.
     * @param timestampType The timestamp type
     * @param serializedKeySize The length of the serialized key
     * @param serializedValueSize The length of the serialized value
     * @param key The key of the record, if one exists (null is allowed)
     * @param value The record contents
     * @param headers The headers of the record
     * @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
     * @param deliveryCount Optional delivery count of the record (may be empty when deliveries not counted)
     */
    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          long timestamp,
                          TimestampType timestampType,
                          int serializedKeySize,
                          int serializedValueSize,
                          K key,
                          V value,
                          Headers headers,
                          Optional<Integer> leaderEpoch,
                          Optional<Short> deliveryCount)

AcknowledgeType

The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.

Enum constantDescription
ACCEPT  (1)The record was consumed successfully
RELEASE  (2)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (3)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.

AdminClient

Add the following methods on the org.apache.kafka.client.admin.AdminClient  interface.

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics)Delete offset information for a set of topics in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of topics in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

  • Altering the offsets for a share group resets the Share-Partition Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned distinct sets of partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

Here are the method signatures.

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of topics in a share group with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics) {
        return deleteShareGroupOffsets(groupId, topics, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of topics in a share group.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<String> topics,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);

AlterShareGroupOffsetsResult

package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

AlterShareGroupOffsetsOptions

package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsets(String, Set<String>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given topic.
     */
    public KafkaFuture<Void> partitionResult(final String topic) {
    }
}

DeleteShareGroupOffsetsOptions

package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsets(String, Set<String>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

DeleteShareGroupsResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a map from group id to futures which can be used to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<Void>> deletedGroups() {
    }
}

DeleteShareGroupsOptions

package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResult {
    /**
     * Return a future which yields all ShareGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescription {
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

    /**
     * Return a future which yields a map of topic partitions to offsets for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }
}

ListShareGroupOffsetsOptions

package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

package org.apache.kafka.client.admin;
 
/**
 * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
   * Set the topic partitions whose offsets are to be listed for a share group.
   */
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

  /**
   * Returns the topic partitions whose offsets are to be listed for a share group.
   */
  Collection<TopicPartition> topicPartitions();
}

ListShareGroupsResult

package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsResult {
    /**
     * Returns a future that yields either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
    }

    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

ShareGroupListing

package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * A listing of a share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupListing {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * The share group state.
   */
  public Optional<ShareGroupState> state();
}

ListShareGroupsOptions

package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * Options for {@link Admin#listShareGroups(ListShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptions> {
    /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public ListShareGroupsOptions inStates(Set<ShareGroupState> states);

    /**
     * Return the list of States that are requested or empty if no states have been specified.
     */
    public Set<ShareGroupState> states();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.
  • FencedStateEpochException  - The share coordinator rejected the request because the share-group state epoch did not match.

They are all subclasses of RetriableException .

Broker API

ConsumerGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor  interface is an interface implemented by server-side assignors for consumer groups. It signifies that the partition assignor is suitable for use with consumer groups.

package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server-side partition assignor for consumer groups used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-848 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
}

The two built-in partition assignors for consumer groups, org.apache.kafka.coordinator.group.assignor.RangeAssignor  and org.apache.kafka.coordinator.group.assignor.UniformAssignor , are both changed to implement this interface instead of org.apache.kafka.coordinator.group.assignor.PartitionAssignor  because they are intended only for use with consumer groups.

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server-side partition assignor for share groups used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

One implementation of this interface, org.apache.kafka.coordinator.group.share.SimpleAssignor, is provided.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Delete share group.

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group, members and offset information.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. Has 2 execution options, '--dry-run' (the default) and '–execute'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes. (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or included in the reset offset process. When resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

Here are some examples. 

To display a list of all share groups:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-console-share-consumer.sh

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: "console-share-consumer" )

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>

The name of the class to use for deserializing values.

--version

Display Kafka version.

kafka-producer-perf-test.sh

The following enhancements are made to the kafka-producer-perf-test.sh tool. The changes are intended to make this tool useful for observing the operation of share groups by generating a low message rate with predictable message payloads.

OptionDescription
--throughput THROUGHPUT(Existing option) Enhanced to permit fractional rates, such as 0.5 meaning 1 message every 2 seconds.
--payload-monotonicpayload is monotonically increasing integer.

Configuration

Broker configuration

ConfigurationDescriptionValues
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
share.coordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
share.coordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
share.coordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

If not specified, uses the broker configuration group.share.record.lock.duration.ms.

If specified, minimum limited by the broker configuration group.share.min.record.lock.duration.ms , maximum limited by the broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.heartbeat.interval.ms .

If specified, minimum limited by the broker configuration group.share.min.heartbeat.interval.ms , maximum limited by the broker configuration group.share.max.heartbeat.interval.ms

group.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • group.instance.id : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer
  • session.timeout.ms : this is deprecated in KIP-848 and is not supported for share groups
  • heartbeat.interval.ms : this is deprecated in KIP-848 and is not supported for share groups

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupStateSummary  - for reading a summary of the share-partition state from a share coordinator

It also introduces new versions of the following APIs:

  • FindCoordinator  - for finding coordinators, to support share coordinators

Access control

This table gives the ACLs required for the new APIs.

RPCOperationResource
ShareGroupHeartbeat ReadGroup
ShareGroupDescribe DescribeGroup
ShareFetch 

Read

Read

Group

Topic

ShareAcknowledge 

Read

Read

Group

Topic

AlterShareGroupOffsets ReadGroup
DeleteShareGroupOffsets ReadGroup
DescribeShareGroupOffsets DescribeGroup
InitializeShareGroupState ClusterActionCluster
ReadShareGroupState ClusterActionCluster
WriteShareGroupState ClusterActionCluster
DeleteShareGroupState ClusterActionCluster
ReadShareGroupStateOffsets ClusterActionCluster

Error codes

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE (121) - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND (122) - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH (123) - The share session epoch is invalid.
  • FENCED_STATE_EPOCH (124) - The share coordinator rejected the request because share-group state epoch did not match.

FindCoordinator API

The KIP introduces version 6.

Request schema

Version 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  "validVersions": "0-6",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

{
  "apiKey": 76,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

{
  "apiKey": 76,
  "type": "response",
  "name": "ShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

{
  "apiKey": 77,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

{
  "apiKey": 77,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." }
    ]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

{
  "apiKey": 78,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "First offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

{
  "apiKey": 78,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The fetch error code, or 0 if there was no fetch error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The acknowledge error message, or null if there was no acknowledge error." }, 
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "FirstOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

{
  "apiKey": 79,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "First offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": 79,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." }
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to describe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]InitializeStateResult", "versions": "0+",
      "about": "The initialization results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about", "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateSummaryRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

{
  "type": "data",
  "name": "ShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "11",
      "about": "The group id." }
  ]
}

ShareGroupMetadataValue

{
  "type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." }
  ]
}

ShareGroupPartitionMetadataKey

{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKey",
  "validVersions": "9",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "9",
      "about": "The group id." }
  ]
}

ShareGroupPartitionMetadataValue

{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid",
        "about": "The topic id." },
      { "name": "TopicName", "versions": "0+", "type": "string",
        "about": "The topic name." },
      { "name": "NumPartitions", "versions": "0+", "type": "int32",
        "about": "The number of partitions of the topic." },
      { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
        "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
        { "name": "Partition", "versions": "0+", "type": "int32",
          "about": "The partition number." },
        { "name": "Racks", "versions": "0+", "type": "[]string",
          "about": "The set of racks that the partition is mapped to." }
      ]}
    ]}
  ]
}

ShareGroupMemberMetadataKey

{
  "type": "data",
  "name": "ShareGroupMemberMetadataKey",
  "validVersions": "10",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "10",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "10",
      "about": "The member id." }
  ]
}

ShareGroupMemberMetadataValue

{
  "type": "data",
  "name": "ShareGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) rack id." },
    { "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ClientHost", "versions": "0+", "type": "string",
      "about": "The client host." },
    { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string",
      "about": "The list of subscribed topic names." }
  ]
}

ShareGroupTargetAssignmentMetadataKey

{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataKey",
  "validVersions": "12",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "12",
      "about": "The group id." }
  ]
}

ShareGroupTargerAssignmentMetadataValue

{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
      "about": "The assignment epoch." }
  ]
}

ShareGroupTargetAssignmentMemberKey

{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMemberKey",
  "validVersions": "13",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "13",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "13",
      "about": "The member id." }
  ]
}

ShareGroupTargetAssignmentMemberValue

{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMemberValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition",
      "about": "The assigned partitions.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid" },
      { "name": "Partitions", "versions": "0+", "type": "[]int32" }
    ]}
  ]
}

ShareGroupCurrentMemberAssignmentKey

{
  "type": "data",
  "name": "ShareGroupCurrentMemberAssignmentKey",
  "validVersions": "14",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "14",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "14",
      "about": "The member id." }
  ]
}

ShareGroupCurrentMemberAssignmentValue

{
  "type": "data",
  "name": "ShareGroupCurrentMemberAssignmentValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MemberEpoch", "versions": "0+", "type": "int32",
      "about": "The current member epoch that is expected from the member in the heartbeat request." },
    { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
      "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    { "name": "State", "versions": "0+", "type": "int8",
      "about": "The member state. See MemberState for the possible values." },
    { "name": "AssignedPartitions", "versions": "0+", "type": "[]TopicPartitions",
      "about": "The partitions assigned to (or owned by) this member." }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic Id." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partition Ids." }
    ]}
  ]
}

ShareGroupStatePartitionMetadataKey

{
  "type": "data",
  "name": "ShareGroupStatePartitionMetadataKey",
  "validVersions": "15",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "15",
      "about": "The group id." }
  ]
}

ShareGroupStatePartitionMetadataValue

{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

ShareSnapshotKey

{
  "type": "data",
  "name": "ShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

ShareSnapshotValue

{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch of the share-partition." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

ShareUpdateKey

{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "1",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "1",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "1",
      "about": "The partition index." }
  ]
}

ShareUpdateValue

{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {Empty|Stable|Dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={Empty|Stable|Dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group-metrics


The number of acknowledgement requests.

kafka.server:type=share-group-metrics,name=share-acknowledgement-rate

kafka.server:type=share-group-metrics,name=share-acknowledgement-count 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group-metrics

ack-type:{Accept,Release,Reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group-metrics,name=record-acknowledgement-rate,ack-type={Accept,Release,Reject} 

kafka.server:type=share-group-metrics,name=record-acknowledgement-count,ack-type={Accept,Release,Reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group-metrics


The time taken to load the share partitions.

kafka.server:type=share-group-metrics,name=partition-load-time-avg

kafka.server:type=share-group-metrics,name=partition-load-time-max  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-max)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

The group coordinator uses metrics in the group group-coordinator-metrics . The share-partition leader uses metrics in the group share-group-metrics . The share coordinator uses metrics in the group share-coordinator-metrics .

Client metrics

The following new client metrics should be added. Following the terminology in KIP-714, these are standard metrics, as opposed to required metrics.

Metric Name

Type

Group

Tags

Description

JMX Bean

Telemetry metric name (KIP-714)

last-poll-seconds-ago

Gauge

consumer-share-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

org.apache.kafka.consumer.share.last.poll.seconds.ago 

time-between-poll-avg

Meter

consumer-share-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.avg 

time-between-poll-max

Meter

consumer-share-metrics

client-id 

The maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.max 

poll-idle-ratio-avg

Meter

consumer-share-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=consumer-share-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.poll.idle.ratio.avg 

heartbeat-response-time-max

Meter

consumer-share-coordinator-metrics

client-id 

The maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.response.time.max 

heartbeat-rate

Meter

consumer-share-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.rate 

heartbeat-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.total 

last-heartbeat-seconds-ago

Gauge

consumer-share-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=consumer-share-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.last.heartbeat.seconds.ago 

rebalance-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.total 

rebalance-rate-per-hour

Meter

consumer-share-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.rate.per.hour 

fetch-size-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.avg 

fetch-size-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.max 

bytes-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.rate 

bytes-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.total 

records-per-request-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.avg 

records-per-request-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.max 

records-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.rate 

records-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.total 

acknowledgements-send-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.rate 

acknowledgements-send-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.total 

acknowledgements-error-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.rate 

acknowledgements-error-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.total 

fetch-latency-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.avg 

fetch-latency-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum time taken for any fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.max 

fetch-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.rate 

fetch-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.total 

fetch-throttle-time-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.avg 

fetch-throttle-time-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.max 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

This KIP introduces delivery counts and a maximum number of delivery attempts. An obvious future extension is the ability to copy records that failed to be delivered onto a dead-letter queue. This would of course give a way to handle poison messages without them permanently blocking processing.

The focus in this KIP is on sharing rather than ordering. The concept can be extended to give key-based ordering so that partial ordering and fine-grained sharing can be achieved at the same time.

For topics in which share groups are the only consumption model, it would be nice to be able to have the SPSO of the share-partitions taken in to consideration when cleaning the log and advancing the log start offset.

It would also be possible to have share-group configuration to control the maximum time-to-live for records and automatically archive them at this time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

Kafka Broker Migration

As is customary for large KIPs, this KIP will be delivered into Apache Kafka progressively, starting with an Early Access release, and then moving through Preview, and finally General Availability.

Early access and Preview

At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.

To turn on the feature, add "share"  to the group.coordinator.rebalance.protocols  configuration. There is no support for upgrade or downgrade.

General availability

This KIP builds upon KIP-848 and the new group coordinator.

To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled using the kafka-feature.sh  tool by setting a group.version  which supports share groups. Finally, the group.coordinator.rebalance.protocols  configuration is changed to add "share"  to the list of enabled rebalance protocols.

KIP-848 introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. KIP-932 adds new records to the __consumer_offsets  topic which will not be understood by the group coordinator. The group coordinator will ignore these records. The __share_group_state  topic will be unused because there will be no share coordinator and can be manually deleted.

Test Plan

The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.

Rejected Alternatives

Share group consumers use KafkaConsumer

In this option, the regular KafkaConsumer  was used by consumers to consume records from a share group, using a configuration parameter group.type  to choose between using a share group or a consumer group. While this means that existing Kafka consumers can trivially make use of share groups, there are some obvious downsides:

  1. An application using KafkaConsumer with a consumer group could be switched to a share group with very different semantics with just a configuration change. There is almost no chance that the application would work correctly.
  2. Libraries such as Kafka Connect which embed Kafka consumers will not work correctly with share groups without code changes beyond changing the configuration. As a result, there is a risk of breaking connectors due to misconfiguration using the group.type  configuration property.
  3. More than half of the KafkaConsumer  methods do not make sense for share groups introducing a lot of unnecessary cruft.

As a result, the KIP now proposes an entirely different class KafkaShareConsumer  which gives a very similar interface as KafkaConsumer  but eliminates the downsides listed above.

Managing durable share-partition state using control records

The share-partition leader is not able to read and write the durable share-partition state itself. Instead it uses inter-broker RPCs to ask the share coordinator to read and write records on an internal metadata topic. An alternative seriously considered was to have the share-partition leader write new types of control records directly onto the user’s topic-partition whose consumption is being managed.

The advantage of using control records is that the share-partition leader is of course also the leader for the user topic-partition meaning that it can both read and write to the partition itself. There’s no inter-broker hop to write acknowledgement information.

The disadvantages are significant. First, control records have always been associated with producing records transactionally, not consuming them. We are mixing (invisible) information about consumption into the stream of events, which either need to be filtered out by all consumers or by the broker. Client-side filtering in the consumers increases the network traffic and processing for the consumers. Broker-side filtering would prevent the broker from achieving zero-copy transfer to the network. Second, the complexity of managing the recovery processing of control records is significant. Every time the leadership of a share-partition changes, it’s necessary to efficiently find the minimal set of control records to process and to process them. Because the control records are mixed in with the user records, this could be difficult to achieve. Third, the share-state control records would be cluster-specific in a similar way as the records on the __consumer_offsets topic. When a topic with share-state control records is replicated to another cluster, the control records would at best be irrelevant on the target cluster, or they would need to be mutated by the replicator before being written to the target.

Managing durable share-partition state using a compacted topic

The share coordinator could use a compacted topic to store its data. However, the data that it writes is unlike the consumer offsets topic in that it is more akin to a log than a keyed data store. Rather than inventing complicates schemes for key uniqueness that prevent the log compactor from deleting required data too early, and as a result putting the log compactor under stress cleaning a large number of keys being written at a high rate, the KIP uses log retention and explicit pruning of old records.

  • No labels