Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated simple assignor and various small improvements

...

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. An initial, trivial implementation would be to give each member the list of all topic-partitions which matches its subscriptions and then use the pull-based protocol to fetch records from all partitions. A more sophisticated implementation could use topic-partition load and lag metrics to distribute partitions among the consumers as a kind of autonomous, self-balancing partition assignment, steering more consumers to busier partitions, for example. Alternatively, a push-based fetching scheme could be usedThis KIP defines just one assignor, org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor, as described below.

A share-partition is a topic-partition with a subscription in a share group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

...

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Relationship with consumer groups

...

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group name ID was first used. As a result, it is helpful to be able to ensure that a group of a particular name can only be created with a particular type. This is can be achieved by defining a group configuration property group.type , using the kafka-configs.sh  tool or the AdminClient.incrementalAlterConfigs  method. For example, you could use the following command to ensure that the group ID "G1" is to be used for a share group only.

...

If a regular Kafka consumer then attempts to use "G1" as a consumer group and the group G1  does not exist or is not a consumer group, the exception InconsistentGroupProtocolException will be thrown.

...

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implement the new internal org.apache.kafka.coordinator.group.assignor.SharePartitionAssignor  interface. This KIP introduces There is just one assignorimplementation of this interface so far, org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor , which assigns all partitions of all subscribed topics to all members. In the future, a more sophisticated share group assignor could balance the number of consumers assigned to the partitions, and it may well revoke partitions from existing members in order to improve the balance. The simple assignor isn’t that smart .

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

...

Whenever the group epoch is larger that the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a group coordinator failover.

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SPEO). The records between starting at the SPSO and up to the SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

The SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SPSO and the SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SPSO and the SPEO. The upper bound is controlled by the broker configuration group.share.record.lock.partition.limit. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

The records in a share-partition are in one of four states:

...

State

...

Description

...

Available

...

The record is available for a consumer

...

Acquired

...

The record has been acquired for a specific consumer, with a time-limited acquisition lock

...

Acknowledged

...

The record has been processed and acknowledged by a consumer

...

Archived

...

The record is not available for a consumer

The SimpleShareAssignor

This KIP introduces org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.

  • The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  • If any partitions which were not assigned any members, members are assigned members round-robin until each partition has at least one member assigned to it.

When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

Here an example of how the assignments might change as members join the share group and the number of partitions changes.

State

Members subscribed to T1

Topic:partitions

Assignment change

Assignments

M1 subscribed to topic T1 with 1 partition

M1 (hash 1)

T1:0

Assign T1:0 to M1

M1 → T1:0

M2 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2)

T1:0

Assign T1:0 to M2

M1 → T1:0

M2 → T1:0

Add 3 partitions to T1

M1 (hash 1), M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:1 and T1:3 to M2

Revoke T1:0 from M2

Assign T1:2 to M1

M1 → T1:0, T1:2

M2 → T1:1, T1:3

M3 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3)

T1:0, T1:1, T1:2, T1:3

Assign T1:2 to M3

Revoke T1:2 from M1

M1 → T1:0

M2 → T1:1, T1:3

M3 → T1:2

M4 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4)

T1:0, T1:1, T1:2, T1:3

Assign T1:3 to M4

Revoke T1:3 from M2

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5, M6, M7, M8 join the group and subscribe to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

Assign T1:0 to M5

Assign T1:1 to M6

Assign T1:2 to M7

Assign T1:3 to M8

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

M1 subscribes to T2 with 2 partitions

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

T2:0, T2:1

Assign T2:0 and T2:1 to M1

M1 → T1:0, T2:0, T2:1

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

All members leave except M2

M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:0, T1:2, T1:3 to M2

M2 → T1:0, T1:1, T1:2, T1:3

SimpleShareAssignor does not make assignments based on rack IDs. SimpleShareAssignor does not make assignments based on lag or consumer throughput.

Record delivery and acknowledgement

This section describes how records are delivered to consumers and how the consumers acknowledge whether delivery was successful.

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SPEO). The records between starting at the SPSO and up to the SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

The SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SPSO and the SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SPSO and the SPEO. The upper bound is controlled by the broker configuration group.share.record.lock.partition.limit. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

The records in a share-partition are in one of four states:

State

Description

Available

The record is available for a consumer

Acquired

The record has been acquired for a specific consumer, with a time-limited acquisition lock

Acknowledged

The record has been processed and acknowledged by a consumer

Archived

The record is not available for a consumer

All All records before the SPSO are in Archived state. All records after the SPEO are in Available state, but not yet being delivered to consumers.

...

  • ShareFetch  for fetching records from share-partition leaders, and optionally acknowledging delivery
  • ShareAcknowledge  for acknowledging delivery with share-partition leaders

...

The share-partition leader clearly has to look within the data returned from the replica manager in order to understand the record batches it fetches. This means that records retrieved using a share group are not able to benefit from the zero-copy optimizationoptimisation.

By iterating over the record batches but not iterating over the individual records within, the share-partition leader is able to understand the log without having to decompress the records. There is one exception to this and that is to do with reading the transaction end markers as described in the next section.

...

Share sessions use memory on the share-partition leader. Each broker that is a share-partition leader has a cache of share sessions. Because a share session is an integral part of how share groups work, as opposed to a performance optimisation in the manner of fetch sessions, the limit is calculated based on share group configurations group.share.max.groups  and group.share.max.size . Sessions are evicted if they have been inactive for more than 120 milliseconds (2 minutes), but they are not evicted based on a strict limit of cache size. The key of the cache is the pair (GroupId, MemberId).

The sessions are handled as follows:

Meaning

RPC request

Request ShareSessionEpoch

Meaning

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

0

This is a full fetch request. It contains the complete set of topic-partitions to fetch. It cannot contains any acknowledgements.

If the request contains acknowledgements, fails with error code INVALID_REQUEST .

If the session is in the cache, discard the existing session releasing all acquired records.

Create a new session in the cache with epoch 1.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

This is an incremental fetch request. It contains a partial set of topic-partitions to be applied to the set already in the cache. It can contain a set of acknowledgements to perform before returning the fetched data.

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request

Request ShareSessionEpoch

epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, update the set of topic-partitions in the cache, increment the epoch in the cache, process the acknowledgements, and fetch records from the replica manager.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

0-1

This is a full final fetch request. It contains the complete set of topic-partitions to fetch. It cannot contains any acknowledgements.can contain a final set of acknowledgements, but its primary purpose is to close the share session.

If the request contains acknowledgementsa list of topics to add or forget, fails with error code INVALID_REQUEST .

If the session is not in the cache, discard the existing session releasing all acquired records.

Create a new session in the cache with epoch 1.

fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and remove the share session from the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

0

Fails with error code INVALID_SHARE_SESSION_EPOCH . It’s not permitted to create a share session with this request.

ShareAcknowledge ShareFetch (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and increment the epoch in the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

-1This is an incremental fetch request. It contains a partial set of topic-partitions to be applied to the set already in the cache. It can contain a set of acknowledgements to perform before returning the fetched data.

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache, process the acknowledgements and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, update the set of topic-partitions in the cache, increment the epoch in the cache, process the acknowledgements, and fetch records from the replica manager.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

-1

This is a final fetch request. It can contain a final set of acknowledgements, but its primary purpose is to close the share session.

If the request contains a list of topics to add or forget, fails with error code INVALID_REQUEST .

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and remove the share session from the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

0

Fails with error code INVALID_SHARE_SESSION_EPOCH . It’s not permitted to create a share session with this request.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and increment the epoch in the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

-1

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache, process the acknowledgements and remove the share session from the cache.

Client programming interface

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interface, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

Each call to KafkaShareConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement. KafkaShareConsumer  works out which style of acknowledgement is being used by the order of calls the application makes. It is not permissible to mix the two styles of acknowledgement.

If the application calls the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

If the application does not call KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaShareConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaShareConsumer.close() which releases any acquired records without acknowledgement.

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException, but the record may well be re-acquired for the same consumer and returned to it again.

...

remove the share session from the cache.

Client programming interface

A new interface KafkaShareConsumer is introduced for consuming from share groups. It looks very similar to KafkaConsumer trimmed down to the methods that apply to share groups.

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interface, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

To join a share group, the client application instantiates a KafkaShareConsumer using the configuration parameter group.id to give the ID of the share group. Then, it uses KafkaShareConsumer.subscribe(Collection<String> topics) to provide the list of topics that it wishes to consume from. The consumer is not allowed to assign partitions itself.

Each call to KafkaShareConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement. KafkaShareConsumer  works out which style of acknowledgement is being used by the order of calls the application makes. It is not permissible to mix the two styles of acknowledgement.

If the application calls the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

If the application does not call KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaShareConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaShareConsumer.close() which releases any acquired records without acknowledgement.

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException, but the record may well be re-acquired for the same consumer and returned to it again.

Handling bad records

There are several ways in which a "bad" record might manifest and each is handled in a different way.

If a record was delivered to a consumer normally but it could not be processed, the consumer can use the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method to release or reject the record, depending on whether it is treated as a transient or permanent error.

If a record was delivered to a consumer but could not be deserialized, the KafkaShareConsumer.poll(Duration) method throws an org.apache.kafka.common.errors.RecordDeserializationException  which contains the record's partition and offset information. The KafkaShareConsumer  automatically releases the record, although the application can override this if it is using explicit acknowledgement. The next call to KafkaShareConsumer.poll(Duration)  skips over the bad record so progress continues to be made.

If the consumer is configured to check CRCs (which is the default using the check.crcs  configuration property), the KafkaShareConsumer.poll(Duration)  throws an org.apache.kafka.common.errors.CorruptRecordException  The KafkaShareConsumer  automatically rejects the entire record batch, meaning that the record batch is not redelivered. The next call to KafkaShareConsumer.poll(Duration)  skips over the bad record batch so progress continues to be made.

Acknowledgement commit callback

Acknowledgements errors are delivered to a new kind of callback called an acknowledge acknowledgement commit callback which can optionally be registered with a KafkaShareConsumer.wakeup() .

  • If the application uses KafkaShareConsumer.commitSync() to commit its acknowledgements, the results of the acknowledgements is returned to the application

  • If the application uses KafkaShareConsumer.commitAsync()  or KafkaShareConsumer.poll(Duration) to commit its acknowledgements, the results of the acknowledgements are only delivered if there is an acknowledge acknowledgement commit callback registered.

The acknowledge acknowledgement commit callback is called on the application thread and it is not permitted to call the methods of KafkaShareConsumer  with the exception of KafkaShareConsumer.wakeup() .

...

Behind the scenes, the KafkaShareConsumer fetches records from the share-partition leader. The leader selects the records in Available state, and will return returns complete record batches (https://kafka.apache.org/documentation/#recordbatch) if possible. It moves the records into Acquired state, increments the delivery count, starts the acquisition lock timeout, and returns them to the KafkaShareConsumer . Then the KafkaShareConsumer keeps a map of the state of the records it has fetched and returns a batch to the application.

...

Code Block
languagejava
@InterfaceStability.Evolving
public interface ShareConsumer<K, V> {

    /**
     * Get the current subscription. Will return the same topics used in the most recent call to
     * {@link #subscribe(Collection)}, or an empty set if no such call has been made.
     *
     * @return The set of topics currently subscribed to
     */
    Set<String> subscription();

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
     * group and will trigger a rebalance operation if any one of the following events are triggered:
     * <ul>
     * <li>A member joins or leaves the share group
     * <li>An existing member of the share group is shut down or fails
     * <li>The number of partitions changes for any of the subscribed topics
     * <li>A subscribed topic is created or deleted
     * </ul>
     *
     * @param topics The list of topics to subscribe to
     *
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws KafkaException for any other unrecoverable errors
     */
    void subscribe(Collection<String> topics);

    /**
     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void unsubscribe();

    /**
     * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
     * subscribed to any topics before polling for data.
     *
     * <p>
     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned.
     *
     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
     *
     * @return map of topic to records since the last fetch for the subscribed list of topics
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws InterruptException if the calling thread is interrupted before or while this method is called
     * @throws InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
     * @throws KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs,
     *             or any new error cases in future versions)
     * @throws IllegalArgumentException if the timeout value is negative
     * @throws IllegalStateException if the consumer is not subscribed to any topics
     * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     */
    ConsumerRecords<K, V> poll(Duration timeout);

    /**
     * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
     * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
     * {@link #poll(Duration)} call.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * @param record The record to acknowledge
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record);

    /**
     * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
     * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
     * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
     * <b>explicit acknowledgement</b>.
     *
     * <p>
     * Records for each topic-partition must be acknowledged in the order they were returned from
     * {@link #poll(Duration)}.
     *
     * @param record The record to acknowledge
     * @param type The acknowledge type which indicates whether it was processed successfully
     *
     * @throws IllegalArgumentException if the record being acknowledged doesn't meet the ordering requirement
     * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
     *                               used implicit acknowledgement
     */
    void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync();

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     * <p>
     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the timeout expires.
     *
     * @param timeout The maximum amount of time to await completion of the acknowledgement
     *
     * @return A map of the results for each topic-partition for which delivery was acknowledged.
     *         If the acknowledgement failed for a topic-partition, an exception is present.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException for any other unrecoverable errors
     */
    Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout);

    /**
     * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
     * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
     * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
     * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
     *
     * @throws KafkaException for any other unrecoverable errors
     */
    void commitAsync();

    /**
     * Sets the acknowledgeacknowledgement commit callback which can be used to handle acknowledgement completion.
     *
     * @param callback The acknowledgeacknowledgement commit callback
     */
    void setAcknowledgeCommitCallbacksetAcknowledgementCommitCallback(AcknowledgeCommitCallbackAcknowledgementCommitCallback callback);

    /**
     * Determines the client's unique client instance ID used for telemetry. This ID is unique to
     * this specific client instance and will not change after it is initially generated.
     * The ID is useful for correlating client operations with telemetry sent to the broker and
     * to its eventual monitoring destinations.
     * <p>
     * If telemetry is enabled, this will first require a connection to the cluster to generate
     * the unique client instance ID. This method waits up to {@code timeout} for the consumer
     * client to complete the request.
     * <p>
     * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
     * configuration option.
     *
     * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
     *                The value must be non-negative. Specifying a timeout of zero means do not
     *                wait for the initial request to complete if it hasn't already.
     *
     * @return The client's assigned instance id used for metrics collection.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws IllegalStateException If telemetry is not enabled because config `{@code enable.metrics.push}`
     *                               is set to `{@code false}`.
     * @throws InterruptException If the thread is interrupted while blocked.
     * @throws KafkaException If an unexpected error occurs while trying to determine the client
     *                        instance ID, though this error does not necessarily imply the
     *                        consumer client is otherwise unusable.
     */
    Uuid clientInstanceId(Duration timeout);

    /**
     * Get the metrics kept by the consumer
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
     * This will commit acknowledgements if possible within the default timeout.
     * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
     *
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close();

    /**
     * Tries to close the consumer cleanly within the specified timeout. This method waits up to
     * {@code timeout} for the consumer to complete acknowledgements and leave the group.
     * If the consumer is unable to complete acknowledgements and gracefully leave the group
     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
     * used to interrupt close.
     *
     * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
     *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
     *
     * @throws IllegalArgumentException If the {@code timeout} is negative.
     * @throws InterruptException If the thread is interrupted before or while this method is called
     * @throws KafkaException for any other error during close
     */
    void close(Duration timeout);

    /**
     * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
     * The thread which is blocking in an operation will throw {@link WakeupException}.
     * If no thread is blocking in a method which can throw {@link WakeupException},
     * the next call to such a method will raise it instead.
     */
   void wakeup();
}

...

Method signatureDescription
KafkaShareConsumer(Map<String, Object> configs)
Constructor
KafkaShareConsumer(Properties properties)
Constructor
KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Constructor

...

AcknowledgementCommitCallback

The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallbackAcknowledgementCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

...

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms

group.type 

Ensures that a newly created group has the specified group type.

Valid values: "consumer"  or "share"  , there is no default

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

...

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpochPartitionMaxBytes", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The maximum currentbytes leaderto epochfetch offrom thethis partition." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
 when only acknowledgement with no fetching    "about": "The maximum bytes to fetch from this partition. is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "Base offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

...