Status

Current state: Released in 4.1

Discussion thread: https://lists.apache.org/thread/gyvfrysoodymgxgwnh4x2g2fs6y5247g

Vote thread: https://lists.apache.org/thread/fb5xxw9313vrljsfqsp2b2cd6gkfldjo

JIRA: KAFKA-19223 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Cluster Metadata and KRaft

Many users have observed that their admin operations can take up to 500 milliseconds, after they complete, to show up in other clients. See KAFKA-14145 - Getting issue details... STATUS and KAFKA-18818 - Getting issue details... STATUS . This is because KRaft parks FETCH requests for 500ms even if the high-watermark (HWM) on the remote replica is less than the HWM on the leader.

The FETCH RPC is used to replicate both the records and the HWM for the partition. The FETCH response contains the latest HWM known by the leader and any bytes written to the partition after the FetchOffset in the FETCH request. In KRaft all of the replicas fetch uncommitted data. When the FetchOffset in the FETCH request is equal to the log end offset (LEO), KRaft parks those requests until new data is available. Once the active controller appends new records, KRaft completes all of the parked requests with the new appended records.

KRaft is not able to determine if it should park FETCH requests based on HWM. The KRaft leader doesn't know what HWM was replicated to the remote replicas. Because of this, if the HWM has changed and there is no new data after the FetchOffset, KRaft parks the FETCH request delaying the HWM replication by at most 500ms. 500ms is the maximum amount of time that the KRaft leader will park FETCH requests if there are no records after the FetchOffset.

Public Interfaces

This KIP proposes adding the replica's HWM to the FETCH request so that the replica receiving the FETCH request can use the remote replica's HWM to determine if FETCH requests should be completed immediately or parked until there is new data or a new HWM.

Metadata Version

A new metadata version will be added. The replica fetcher with send the new FETCH RPC version (18) when the metadata version has been finalized to this new metadata version.

Fetch

Request

Add a version 18 of the RPC. Add the field HighWatermark which the replica sets to its known HWM. Set to -1 if the HWM is not known. The default value for the HWM is the largest int64 or 9223372036854775807.

The remote HWM can be in the following states:

  1. The remote replica doesn't know the HWM. The value of the HighWatermark field is -1.
  2. The remote replica's latest known HWM. The value of the HighWatermark field is between 0 and 9223372036854775807, inclusive.
  3. The remote replica doesn't support version 18 of the FETCH RPC. The value seen by the receiving replica will be 9223372036854775807.

For the remote replicas (followers and consumer) that don't support or set the HighWatermark field, Kafka should behave the same as it does today. In other words, Kafka should not consider the HighWatermark field when determining wether to complete the FETCH request or park the FETCH request. Setting the default value for the HighWatermark field to 9223372036854775807 achieves that because the receiving replica can use the predicate "local HWN <= remote HWM" to determine if it may park the fetch request.

{
  "apiKey": 1,
  "type": "request",
  "listeners": ["broker", "controller"],
  "name": "FetchRequest",
  "validVersions": "4-18",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
      "taggedVersions": "12+", "tag": 0, "ignorable": true,
      "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1,
      "about": "The state of the replica in the follower.", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId",
        "about": "The replica ID of the follower, or -1 if this request is from a consumer." },
      { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1",
        "about": "The epoch of this follower, or -1 if not available." }
    ]},
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To   be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": true,
      "about": "The fetch session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
      "about": "The fetch session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "FetchOffset", "type": "int64", "versions": "0+",
          "about": "The message offset." },
        { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false,
          "about": "The epoch of the last fetched record or -1 if there is none."},
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
        { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true,
          "about": "The directory id of the follower fetching." },
        { "name": "HighWatermark", "type": "int64", "versions": "18+", "default": "9223372036854775807", "taggedVersions": 18+", "tag": 1, "ignorable": true,
          "about": "The high-watermark known by the replica. -1 if the high-watermark is not known." }
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "Topic", "type": "string", "versions": "7-12", "entityType": "topicName", "ignorable": true,
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "7+",
        "about": "The partitions indexes to forget." }
    ]},
    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
      "about": "Rack ID of the consumer making this request."}
  ]
}

Response

No schema changes to the response. The version will be increase to 18 to match the request version.

Handling

KRaft

The current implementation always includes the HWM in the FETCH response, this will stay true in this KIP. What this KIP changes is when the FETCH request may be parked and when the FETCH request may be completed when parked.

If the response is empty and the remote replica's HWM is greater than or equal to the leader's HWM, the FETCH request is parked.

When the leader's HWM changes, the leader may complete all parked FETCH requests.

Sending

When sending FETCH requests the replica will include its HWM in the FETCH request's HighWatermark field. If the remote replica support version 18 of the RPC the HWM will be serialized. If the remote replica doesn't support version 18 the HighWatermark field in the field will ignored and not serialized.

Replica Fetcher and Kafka Consumers

Kafka consumer and the replica fetcher should not set the HighWatermark field. The broker and replica manager will ignore the HighWatermark field if specified.

Compatibility, Deprecation, and Migration Plan

Replicas that do not support version 18 of FETCH will send the previous version without the HighWatermark field. The default value for the HighWatermark field will be the maximum value of int64 (9223372036854775807). This means that from the leader's point of view the remote replica's HWM will always be greater that its HWM so the leader will never complete a FETCH request because of the value HWM. This is the behavior prior to this KIP.

If the remote replica supports version 18 of FETCH but the leader does not, the HighWatermark field will be ignored. The leader will handle the FETCH request as implemented in previous versions of Kafka.

Test Plan

For KRaft, this feature will mainly be tested using the KRaft protocol unittests as supported in KafkaRaftClientFetchTest.

  • No labels