DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
Voting thread: here
JIRA:
KAFKA-19742
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-932 introduced queueing semantics in Kafka with new group and consumer types: share groups and share consumers. Multiple share consumers in a share group can subscribe to user topics and consume data cooperatively without partition limits.
A share consumer can operate in implicit or explicit mode. In implicit mode, the acknowledgement type is fixed to ACCEPT. In explicit mode, each record must be acknowledged explicitly. The acknowledgement types are:
ACCEPT - indicates the record was consumed successfully, causing it to be acknowledged on the server and never re-delivered to other consumers.
REJECT - indicates the record was not consumed successfully, causing it to be archived on the server and never re-delivered to other consumers.
RELEASE - indicates the record was not consumed successfully, causing it to be moved to available state, making it eligible for re-delivery.
When a share consumer polls a batch of records, the server attaches an acquisition lock timeout task to the batch. If the batch is not acknowledged before the acquisition lock expires, it transitions to available state and becomes eligible for re-delivery (unless delivery count limit is exceeded). The broker cancels and clears the acquisition lock timeout task on ACCEPT or REJECT. On RELEASE state is moved to available and acquisition task is restarted on next delivery attempt.
If a user application using a share consumer in explicit mode processes a record for a long time, it cannot use any acknowledgement type to affect the server acquisition lock timeout without changing the record state. If processing exceeds the timeout duration, the acquisition lock expires and the record is re-delivered, which may be undesirable.
This KIP proposes a new acknowledgement type, RENEW, for explicit mode. It lets a share consumer request the broker to renew the acquisition lock timeout, thereby extending the current delivery attempt without changing the server's record state.
Public Interfaces
- There will be an addition in the
org/apache/kafka/clients/consumer/AcknowledgeType.javaenum where we will add a new entryRENEWwith id 4.
public enum AcknowledgeType {
/** The record was consumed successfully. */
ACCEPT((byte) 1),
/** The record was not consumed successfully. Release it for another delivery attempt. */
RELEASE((byte) 2),
/** The record was not consumed successfully. Reject it and do not release it for another delivery attempt. */
REJECT((byte) 3),
/** Consumer needs more time to process the record. Renew the lease. */
RENEW((byte) 4); // New entry per KIP-1222
...
}
- The acknowledgement type will be used in the existing ShareFetch and ShareAcknowledge RPCs. Hence, we must add new versions for them. Additionally, we will add a field
IsRenewAckdefaulting tofalseto both the RPCs. This field will help in optimising the broker side implementation for processing renew acknowledgements.clients/src/main/resources/common/message/ShareFetchRequest.json// ShareFetch { "apiKey": 78, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 supports RENEW ack type "validVersions": "1-2", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "MaxRecords", "type": "int32", "versions": "1+", "about": "The maximum number of records to fetch. This limit can be exceeded for alignment of batch boundaries." }, { "name": "BatchSize", "type": "int32", "versions": "1+", "about": "The optimal number of records for batches of acquired records and acknowledgements." }, { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": false, "about": "Whether renew type acknowledgement present in AcknowledgementBatches." }, // Version 2 supports RENEW ack type and this field serves as an indicator (KIP-1222) { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true }, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, "about": "The partition index." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0", "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject,4:Renew."} // Version 2 supports RENEW ack type (KIP-1222) ]} ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "about": "The partitions to remove from this share session.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] }clients/src/main/resources/common/message/ShareAcknowledgeRequest.json// ShareAcknowledge { "apiKey": 79, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apache Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 will have RENEW ack type "validVersions": "1-2", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": false, "about": "Whether renew type acknowledgement present in AcknowledgementBatches." }, // Version 2 supports RENEW ack type and this field serves as an indicator (KIP-1222) { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+", "about": "The topics containing records to acknowledge.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true }, { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+", "about": "The partitions containing records to acknowledge.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, "about": "The partition index." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge." }, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject,4:Renew" } // Version 2 supports RENEW ack type (KIP-1222) ]} ]} ]} ] }clients/src/main/resources/common/message/ShareAcknowledgeResponse.json (Response will contain a new field AcquisitionLockTimeoutMs from version 2 onwards).{ "apiKey": 79, "type": "response", "name": "ShareAcknowledgeResponse", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apache Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 introduces Renew acknowledgements (KIP-1222). "validVersions": "1-2", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level response error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "2+", "about": "The time in milliseconds for which the acquired records are locked." }, { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "about": "The current leader of the partition.", "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] }
New method exposed in
clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.javainterface to help the application determine RENEW interval./** Returns the acquisition lock timeout value in milliseconds for the last set of records fetched from the brokers. This is an optional field as unless an application poll() results in a ShareFetch to a broker, this value cannot be determined. */ public Optional<Integer> acquisitionLockTimeoutMs();
An additional tag for the metric RecordAcknowledgementsPerSec, defined in KIP-1103, will get introduced as a side effect of introducing the RENEW ack type. The new definition of the metric is:
Rationale Metric Name Type Group Tags Description JMX Bean Tracks the rate of records acknowledged per acknowledgement type. RecordAcknowledgementsPerSec Meter ShareGroupMetrics ackType:{Accept|Release|Reject|Renew}The rate per second of records acknowledged per acknowledgement type. kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec,ackType={Accept|Release|Reject|Renew}
Proposed Changes
KIP-932 added two new RPCs for record fetching and acknowledging: ShareFetch and ShareAcknowledge. From the share consumer, these RPCs are called when an application invokes poll(Duration)and commitSync(Duration)/commitAsync().
In implicit mode, an application calls poll() on the share consumer and receives a batch of records. It can acknowledge these records on a subsequent call to poll() (piggybacking on ShareFetch). Calling commitSync()/commitAsync() sends a ShareAcknowledge but only the ACCEPT acknowledgement type is used. Calling acknowledge(ConsumerRecord, AcknowledgeType) is illegal in this mode. This KIP does not target implicit mode.
In explicit mode, polling works the same, but the application must acknowledge each record explicitly using one of the AcknowledgeType values. It is here that RENEW comes into play, should the application need it.
Client side changes
If record processing might take time:
- The application should use the new acknowledgement type
AcknowledgeType.RENEWas an argument toacknowledge(ConsumerRecord, AcknowledgeType)for that record. - It can then call either
commitSync()/commitAsync()orpoll()to send the acknowledgement to the server. Furthermore, the application could also leverage the newShareConsumer.acquisitionLockTimeoutMs()method to decide on the timeframe in which to make the RENEW call. If the subsequent call is poll(), the share consumer will create a ShareFetch request withIsRenewAckset totrueandMaxWaitMs, MinBytes, MaxBytes and MaxRecordsto 0. If instead ofpoll(),commitSync()/commitAsync()are called,IsRenewAckwill be set appropriately as well.
There is a case where an application tries to send a RENEW acknowledgement to an old broker which does not support the new type. To remedy this, on the next call to poll/commitSync/commitAsync, the share consumer will verify if the ack type is supported and notify the application using the appropriate callback handler. The share consumer will return the exception UnsupportedVersionException (Errors.UNSUPPORTED_VERSION (35)) as response to a RENEW request meant for an unsupported broker. This will be handled completely on the client side where the share consumer will use the API version RPC to determine acknowledgement type validity.
When we RENEW a record, the record state for the same maintained by the SharePartition does not change. This means that on subsequent polls by the same member, the record will not be returned by the broker until the lock timeout expires. Due to this, the application might not get any update about the aforementioned record. To remedy this, we propose buffering any RENEW acknowledged records on the share consumer and returning them on subsequent polls until they are re-delivered by the broker at which point the buffer entry can be cleared.
We assume the application uses appropriate concurrency constructs to process records in separate threads.
Broker side changes
On the broker side, receiving a RENEW acknowledgement for a specific batch or offset will cancel the existing acquisition lock timeout task and start a new one with the same timeout value as group.share.record.lock.duration.ms. There is one caveat here. If the application causes a ShareFetch RPC to be sent (poll() call) on which RENEW acknowledgements are piggybacked, it could happen that the renewed acquisition lock again times out before the poll() completes. To get around this, we will not return any new records from the broker side on ShareFetch requests containing RENEW acknowledgements (indicated by the IsRenewAck field). That way we are guaranteed that the poll() completes timely and we can return the response of acknowledgements immediately to the application. A true value of IsRenewAck makes it clear that a ShareFetch request is renewing rather than acquiring records. If IsRenewAck is set to true, MaxWaitMs, MinBytes, MaxBytes and MaxRecords must be set to 0. Also, by looking at the value of IsRenewAck, the broker could process the request a bit more optimally.
If a share consumer sends a ShareAcknowledge request for renewal, the response should provide the value of the acquisition lock timeout. This is because the timeout is a configurable field. It could happen that a record is fetched with a specific timeout, then the timeout value is updated on the broker and then the share consumer issues a ShareAcknowledge RENEW call on the aforementioned record. Now the renewed timeout should be timely communicated to the consumer. The new field AcquisitionLockTimeoutMs in ShareAcknowledge response will fulfil this use case.
If the RENEW request received by a broker is invalid and an error is returned, the application should use existing acknowledgement commit callback to listen for the same. Specifically, the application should set a commit response handler callback in ShareConsumer.setAcknowledgementCommitCallback(AcknowledgementCommitCallback) and handle errors, if any. This is the usual mechanism to check commit status in explicit mode and this KIP does not make any change in this code path.
Ensuring progress
There is a possibility where an application renewing a record repeatedly, could hamper progress on that share group. This will cause a head of the line blocking issue. Currently, we are not planning to put any restriction on the number of allowed renewals as that goes against the intended purpose of this KIP. But we will be logging renewal information on both the server and client side to help with debugging and discipline any misbehaving applications/share consumers part of the share group.
Compatibility, Deprecation, and Migration Plan
- The existing functionality is not modified. Applications using share consumers in explicit mode will get a new capability of renewing records.
- We are increasing the version number of ShareFetch and ShareAcknowledge requests. Currently, the brokers handle unknown acknowledgement types gracefully by returning
Errors.INVALID_REQUESTto the client. This will also be the case when an old broker receives RENEW request. There is no change required in this path. - Though brokers with old and new code can handle RENEW (return error vs renew lock), the share consumer should be smart enough to figure out based on the broker API versions whether to send the RENEW ack or not (check client side changes section).
Test Plan
- We will be adding new unit tests mainly in
core/src/test/java/kafka/server/share/SharePartitionTest.javato verify batch and offset level renewal as well as mix acknowledgement type handling. - New integration tests will be added in
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.javato verify mix and renew acknowledgement type handling.