Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are used to handle the LIST_OFFSETS requests, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. To overcome this, we plan to introduce a delayed remote list offsets purgatory that delegates the request from request-handler to remote-log-reader threads. And, the request gets handled in the async fashion, freeing up the request-handler threads to handle other high priority requests.

Public Interfaces

...

  • The

...

A public interface is any change to the following:

  • The below broker config will be added to the RemoteLogManagerConfig
    • remote.list.offsets.request.timeout.ms: to define the timeout for the remote list offsets requests. This will be a dynamic config and defaults to 30 seconds.
  • Metric to emit the expired DelayedRemoteListOffsets request.
    • kafka.server:type:DelayedRemoteListOffsetsMetrics,name=ExpiresPerSec,topic=([-.\w]+),partition=([0-9]+)
  • PurgatoryName = RemoteListOffsets
  • A new TimeoutMs field will be added to the ListOffsetsRequest to convey the timeout to the server.

Proposed Changes

The LIST_OFFSETS API is invoked by:

...

  1. Proposing a new dynamic remote.list.offsets.request.timeout.ms broker config which defaults to 30 seconds. If the remote LIST_OFFSETS requests couldn't be served within the timeout, then the TimeoutException will be thrown back to the caller. The client won't have the ability to adjust the timeout.  The cluster operator can update/increase the remote LIST_OFFSETS request timeout when the remote storage gets degraded. We can emit an expired DelayedRemoteListOffsets metric, based on which the cluster operator/admin can increase the timeout on the server. 
  2. A new field "TimeoutMs" will be added to the ListOffsetsRequest. Newer clients can supply the maximum timeout to await for a response from the server. Adding the timeout to ListOffsetsRequest open a room to clients to pick up the "suitable" wait time. For older clients, we will use the server config "remote.list.offests.request.timeout.ms" and when timeout is not supplied.
  3. One LIST_OFFSETS request is served by one request-handler thread. When there are multiple partitions/topics in the request, then the partitions in the request gets processed serially. With DelayedRemoteListOffsets purgatory, we will create a task for each partition in the request and add it to the queue. The remote-log-reader threads can pick up the task when they are available and the request gets served in async and parallel fashion.
  4. We will be reusing the remote-log-reader thread pool to process the remote LIST_OFFSETS requests.


Code Block
languagejava
titleListOffsetsRequest.json
{
  "apiKey": 2,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListOffsetsRequest",
  // Version 1 removes MaxNumOffsets.  From this version forward, only a single
  // offset can be returned.
  //
  // Version 2 adds the isolation level, which is used for transactional reads.
  //
  // Version 3 is the same as version 2.
  //
  // Version 4 adds the current leader epoch, which is used for fencing.
  //
  // Version 5 is the same as version 4.
  //
  // Version 6 enables flexible versions.
  //
  // Version 7 enables listing offsets by max timestamp (KIP-734).
  //
  // Version 8 enables listing offsets by local log start offset (KIP-405).
  //
  // Version 9 enables listing offsets by last tiered offset (KIP-1005).
  //
  // Version 10 enables async remote list offsets support (KIP-1075)
  "validVersions": "0-10",
  "deprecatedVersions": "0",
  "flexibleVersions": "6+",
  "latestVersionUnstable": false,
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID of the requester, or -1 if this request is being made by a normal consumer." },
    { "name": "IsolationLevel", "type": "int8", "versions": "2+",
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+",
      "about": "Each topic in the request.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+",
        "about": "Each partition in the request.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The current timestamp." },
        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
          "about": "The maximum number of offsets to report." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "10+", "default": "-1",
      "about": "The timeout to await a response in milliseconds." }
  ]
}


Note:
To maintain the same behavior as existing:

  1. AdminClient will supply the "default.api.timeout.ms" configured on client in the ListOffsetsRequest and
  2. Consumer will supply the "request.timeout.ms" in the ListOffsetsRequest 

Compatibility, Deprecation, and Migration Plan

...

       There won't be any behavior change to the existing clients. To For older clients, to increase the timeout for remote LIST_OFFSETS, the user have to change the timeout on both the server and client end to make it effective. For newer clients, they can supply the request timeout in the ListOffsetsRequest.


  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

...