DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
- The acknowledgement type will be used in the existing ShareFetch and ShareAcknowledge RPCs. Hence, we must add new versions for them. Additionally, we will add a field
IsRenewAckdefaulting tofalseto both the RPCs. This field will help in optimising the broker side implementation for processing renew acknowledgements.clients/src/main/resources/common/message/ShareFetchRequest.jsonCode Block // ShareFetch { "apiKey": 78, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 supports RENEW ack type "validVersions": "1-2", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "MaxRecords", "type": "int32", "versions": "1+", "about": "The maximum number of records to fetch. This limit can be exceeded for alignment of batch boundaries." }, { "name": "BatchSize", "type": "int32", "versions": "1+", "about": "The optimal number of records for batches of acquired records and acknowledgements." }, { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": false, "about": "Whether renew type acknowledgement present in AcknowledgementBatches." }, // Version 2 supports RENEW ack type and this field serves as an indicator (KIP-1222) { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true }, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, "about": "The partition index." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0", "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject,4:Renew."} // Version 2 supports RENEW ack type (KIP-1222) ]} ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "about": "The partitions to remove from this share session.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] }clients/src/main/resources/common/message/ShareAcknowledgeRequest.jsonCode Block // ShareAcknowledge { "apiKey": 79, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in ApackeApache Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 will have RENEW ack type "validVersions": "1-2", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "IsRenewAck", "type": "bool", "versions": "2+", "default": false, "about": "Whether renew type acknowledgement present in AcknowledgementBatches." }, // Version 2 supports RENEW ack type and this field serves as an indicator (KIP-1222) { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+", "about": "The topics containing records to acknowledge.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true }, { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+", "about": "The partitions containing records to acknowledge.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, "about": "The partition index." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge." }, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject,4:Renew" } // Version 2 supports RENEW ack type (KIP-1222) ]} ]} ]} ] }
clients/src/main/
resources/
common/
message/ShareAcknowledgeResponse.json (Response will contain a new field AcquisitionLockTimeoutMs from version 2 onwards).Code Block
{
"apiKey": 79, "type": "response", "name": "ShareAcknowledgeResponse", // Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apache Kafka 4.1. // // Version 1 is the initial stable version (KIP-932). // // Version 2 introduces Renew acknowledgements (KIP-1222). "validVersions": "1-2", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level response error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "2+", "about": "The time in milliseconds for which the acquired records are locked." }, { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "about": "The current leader of the partition.", "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] }
New method exposed in
clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.javainterface to help the application determine RENEW interval.Code Block /** Returns the acquisition lock timeout value in milliseconds for the last set of records fetched from the brokers. This is an optional field as unless an application poll() results in a ShareFetch to a broker, this value cannot be determined. */ public Optional<Integer> acquisitionLockTimeoutMs();An additional tag for the metric RecordAcknowledgementsPerSec, defined in KIP-1103, will get introduced as a side effect of introducing the RENEW ack type. The new definition of the metric is:
An additional tag for the metric RecordAcknowledgementsPerSec, defined in KIP-1103, will get introduced as a side effect of introducing the RENEW ack type. The new definition of the metric is:
Rationale Metric Name Type Group Tags Description JMX Bean Tracks the rate of records acknowledged per acknowledgement type. RecordAcknowledgementsPerSec Meter ShareGroupMetrics ackType:{Accept|Release|Reject|Renew}The rate per second of records acknowledged per acknowledgement type. kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec,ackType={Accept|Release|Reject|Renew}
Proposed Changes
KIP-932 added two new RPCs for record fetching and acknowledging: ShareFetch and ShareAcknowledge. From the share consumer, these RPCs are called when an application invokes poll(Duration)and commitSync(Duration)/commitAsync().
...
On the broker side, receiving a RENEW acknowledgement for a specific batch or offset will cancel the existing acquisition lock timeout task and start a new one with the same timeout value as group.share.record.lock.duration.ms. There is one caveat here. If the application causes a ShareFetch RPC to be sent (poll() call) on which RENEW acknowledgements are piggybacked, it could happen that the renewed acquisition lock again times out before the poll() completes. To get around this, we will not return any new records from the broker side on ShareFetch requests containing RENEW acknowledgements (indicated by the IsRenewAck field). That way we are guaranteed that the poll() completes timely and we can return the response of acknowledgements immediately to the application. A true value of IsRenewAck makes it clear that a ShareFetch request is renewing rather than acquiring records. If IsRenewAck is set to true, MaxWaitMs, MinBytes, MaxBytes and MaxRecords must be set to 0MaxRecords must be set to 0. Also, by looking at the value of IsRenewAck, the broker could process the request a bit more optimally.
If a share consumer sends a ShareAcknowledge request for renewal, the response should provide the value of the acquisition lock timeout. This is because the timeout is a configurable field. It could happen that a record is fetched with a specific timeout, then the timeout value is updated on the broker and then the share consumer issues a ShareAcknowledge RENEW call on the aforementioned record. Now the renewed timeout should be timely communicated to the consumer. The new field AcquisitionLockTimeoutMs in ShareAcknowledge response will fulfil this use case. Also, by looking at the value of IsRenewAck, the broker could process the request a bit more optimally.
If the RENEW request received by a broker is invalid and an error is returned, the application should use existing acknowledgement commit callback to listen for the same. Specifically, the application should set a commit response handler callback in ShareConsumer.setAcknowledgementCommitCallback(AcknowledgementCommitCallback) and handle errors, if any. This is the usual mechanism to check commit status in explicit mode and this KIP does not make any change in this code path.
...