Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion" Accepted

Discussion thread: here

JIRA: KAFKA-15859 

...

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are used to handle the LIST_OFFSETS requests, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. To overcome this, we plan to introduce a delayed remote list offsets purgatory that delegates the request from request-handler to remote-log-reader threads. And, the request gets handled in the async fashion, freeing up the request-handler threads to handle other high priority requests.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...

  • The below broker configs config will be introduced in added to the RemoteLogManagerConfig
    • remote.list.offsets.request.timeout.ms: to define the timeout for the remote list offsets requests. This will be a dynamic config .
    • remote.log.offset.reader.threads: dedicated thread pool to handle remote LIST_OFFSETS requests. This will be a dynamic config.
    • remote.log.offset.read.max.pending.tasks: defines the remote-offset reader thread pool queue size. This will be a static config.
  • Add a new maxTimestamp API in the RemoteLogMetadataManager to redact the maxTimestamp of all the uploaded segments. 
  • Below are the new metrics required for this KIP:
    • Metric to emit the remote-log offset reader thread pool task queue size and average idle percentage:
    • org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogOffsetReaderTaskQueueSize

    • org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogOffsetReaderAvgIdlePercentand defaults to 30 seconds.
  • Metric to emit the expired DelayedRemoteListOffsets request.
       
    • kafka.server:type:DelayedRemoteListOffsetsMetrics,name=ExpiresPerSec,topic=([-.\w]+),partition=([0-9]+)
  • PurgatoryName = RemoteListOffsets
  • A new TimeoutMs field will be added to the ListOffsetsRequest to convey the client timeout to the server.

Proposed Changes

The LIST_OFFSETS API is invoked by:

...

KafkaConsumer sends one or more requests internally depends on the configured request-timeout and default-api-timeout. (eg) If the consumer is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the consumer internally sends 10 LIST_OFFSETS requests, one request every 30 seconds, before failing with TimeoutException after 5 minutes.

...

  1. Proposing a new dynamic remote.list.offsets.request.timeout.ms broker config which defaults to 30 seconds. If the remote LIST_OFFSETS requests couldn't be served within the timeout, then the TimeoutException will be thrown back to the caller. The client won't have the ability to adjust the timeout.  The cluster operator can update/increase the remote LIST_OFFSETS request timeout when the remote storage gets degraded. We can emit an expired DelayedRemoteListOffsets metric, based on which the cluster operator/admin can increase the timeout on the server. 
  2. A new field "TimeoutMs" will be added to the ListOffsetsRequest. Newer clients can supply the maximum timeout to await for a response from the server. Adding the timeout to ListOffsetsRequest open a room to clients to pick up the "suitable" wait time. For older clients, we will use the server config "remote.list.offests.request.timeout.ms" and when timeout is not supplied.
  3. One LIST_OFFSETS request is served by one request-handler thread. When there are multiple partitions/topics in the request, then the partitions in the request gets processed serially. With DelayedRemoteListOffsets purgatory, we will create a task for each partition in the request and add it to the queue. The remote-log-reader threads can pick up the task when they are available and the request gets served in async and parallel fashion.
  4. Similar to We will be reusing the remote-log-reader thread pool, we will be defining one more dedicated thread pool to serve remote LIST_OFFSETS requests to avoid noisy neighbour issues when handling process the remote FETCH and LIST_OFFSETS requests. And, the default number of threads to be configured is 5 per broker. The thread pool size can be increased/decreased dynamically.
  5. A new API will be added in RemoteLogMetadataManager to find the maximum record timestamp of all the uploaded remote log segments. Each segment contains the maximum timestamp that was seen so far and the same information also propagated in the RemoteLogMetadata events. This API redacts the timestamps across all the uploaded remote log segments (COPY_SEGMENT_FINISHED) to provide the maximum timestamp seen so far. This API will be used to decide whether to read the log from earliest-offset (or) from the earliest-local-offset when handling the LIST_OFFSETS request for a given valid timestamp.
    1. If the target timestamp is less than or equal to the maximum-remote-timestamp, then we start to scan the log from the earliest offset. (ie) From remote to local storage.
    2. If the target timestamp is greater than the maximum-remote-timestamp, then we start to scan the log from earliest-local offset. (ie) Only in local storage.
    3. If the maximum remote timestamp is NULL, then we scan the log from the earliest offset. (ie) From remote to local storage.

...

languagejava
titleRemoteLogMetadataManager

...

  1. .


Code Block
languagejava
titleListOffsetsRequest.json
{
  "apiKey": 2,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListOffsetsRequest",
  // Version 1 removes MaxNumOffsets.  From this version forward, only a single
  // offset can be returned.
  //
  // Version 2 adds the isolation level, which is used for transactional reads.
  //
  // Version 3 is the same as version 2.
  //
  // Version 4 adds the current leader epoch, which is used for fencing.
  //
  // Version 5 is the same as version 4.
  //
  // Version 6 enables flexible versions.
  //
  // Version 7 enables listing offsets by max timestamp (KIP-734).
  //
  // Version 8 enables listing offsets by local log start offset (KIP-405).
  //
  // Version 9 enables listing offsets by last tiered offset (KIP-1005).
  //
  // Version 10 enables async remote list offsets support (KIP-1075)
  "validVersions": "0-10",
  "deprecatedVersions": "0",
  "flexibleVersions": "6+",
  "latestVersionUnstable": false,
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID of the requester, or -1 if this request is being made by a normal consumer." },
    { "name": "IsolationLevel", "type": "int8", "versions": "2+",
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+",
      "about": "Each topic in the request.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+",
        "about": "Each partition in the request.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The current timestamp." },
        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
          "about": "The maximum number of offsets to report." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "10+", "default": "-1",
      "about": "The timeout to await a response in milliseconds." }
  ]
}


Note:
To maintain the same behavior as existing:

  1. AdminClient will supply the "default.api.timeout.ms" as timeout in the ListOffsetsRequest and
  2. Consumer will supply the "request.timeout.ms" as timeout in the ListOffsetsRequest 

Compatibility, Deprecation, and Migration Plan

...

       There won't be any behavior change to the existing clients. To For older clients, to increase the timeout for remote LIST_OFFSETS, the user have to change the timeout on both the server and client end to make it effective. For newer clients, they can supply the request timeout in the ListOffsetsRequest.


  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

...