Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-15859 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are used to handle the LIST_OFFSETS requests, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. To overcome this, we plan to introduce a delayed remote list offsets purgatory that delegates the request from request-handler to remote-log-reader threads. And, the request gets handled in the async fashion, freeing up the request-handler threads to handle other high priority requests.

Public Interfaces

  • The below broker config will be added to the RemoteLogManagerConfig
    • remote.list.offsets.request.timeout.ms: to define the timeout for the remote list offsets requests. This will be a dynamic config and defaults to 30 seconds.
  • Metric to emit the expired DelayedRemoteListOffsets request.
    • kafka.server:type:DelayedRemoteListOffsetsMetrics,name=ExpiresPerSec,topic=([-.\w]+),partition=([0-9]+)
  • PurgatoryName = RemoteListOffsets
  • A new TimeoutMs field will be added to the ListOffsetsRequest to convey the client timeout to the server.

Proposed Changes

The LIST_OFFSETS API is invoked by:

  1. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
  2. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
  3. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) and
  4. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)

Both KafkaConsumer and Admin defines the default api-timeout to 60 seconds and default request-timeout to 30 seconds. There is no upper timeout boundary for handling the LIST_OFFSETS request on the server side and the client does not convey any timeout in the ListOffsetsRequest.json request.

Current behavior

KafkaConsumer sends one or more requests internally depends on the configured request-timeout and default-api-timeout. (eg) If the consumer is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the consumer internally sends 10 LIST_OFFSETS requests, one request every 30 seconds, before failing with TimeoutException after 5 minutes.

Admin sends only one request and wait for upto default-api-timeout. (eg) If the admin is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the admin sends only one LIST_OFFSETS request, then receives the response from server after 50 seconds.


Proposal

  1. Proposing a new dynamic remote.list.offsets.request.timeout.ms broker config which defaults to 30 seconds. If the remote LIST_OFFSETS requests couldn't be served within the timeout, then the TimeoutException will be thrown back to the caller.  The cluster operator can update/increase the remote LIST_OFFSETS request timeout when the remote storage gets degraded. We can emit an expired DelayedRemoteListOffsets metric, based on which the cluster operator/admin can increase the timeout on the server. 
  2. A new field "TimeoutMs" will be added to the ListOffsetsRequest. Newer clients can supply the maximum timeout to await for a response from the server. Adding the timeout to ListOffsetsRequest open a room to clients to pick up the "suitable" wait time. For older clients, we will use the server config "remote.list.offests.request.timeout.ms" and when timeout is not supplied.
  3. One LIST_OFFSETS request is served by one request-handler thread. When there are multiple partitions/topics in the request, then the partitions in the request gets processed serially. With DelayedRemoteListOffsets purgatory, we will create a task for each partition in the request and add it to the queue. The remote-log-reader threads can pick up the task when they are available and the request gets served in async and parallel fashion.
  4. We will be reusing the remote-log-reader thread pool to process the remote LIST_OFFSETS requests.


ListOffsetsRequest.json
{
  "apiKey": 2,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListOffsetsRequest",
  // Version 1 removes MaxNumOffsets.  From this version forward, only a single
  // offset can be returned.
  //
  // Version 2 adds the isolation level, which is used for transactional reads.
  //
  // Version 3 is the same as version 2.
  //
  // Version 4 adds the current leader epoch, which is used for fencing.
  //
  // Version 5 is the same as version 4.
  //
  // Version 6 enables flexible versions.
  //
  // Version 7 enables listing offsets by max timestamp (KIP-734).
  //
  // Version 8 enables listing offsets by local log start offset (KIP-405).
  //
  // Version 9 enables listing offsets by last tiered offset (KIP-1005).
  //
  // Version 10 enables async remote list offsets support (KIP-1075)
  "validVersions": "0-10",
  "deprecatedVersions": "0",
  "flexibleVersions": "6+",
  "latestVersionUnstable": false,
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID of the requester, or -1 if this request is being made by a normal consumer." },
    { "name": "IsolationLevel", "type": "int8", "versions": "2+",
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "Topics", "type": "[]ListOffsetsTopic", "versions": "0+",
      "about": "Each topic in the request.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ListOffsetsPartition", "versions": "0+",
        "about": "Each partition in the request.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch." },
        { "name": "Timestamp", "type": "int64", "versions": "0+",
          "about": "The current timestamp." },
        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
          "about": "The maximum number of offsets to report." }
      ]}
    ]},
    { "name": "TimeoutMs", "type": "int32", "versions": "10+", "default": "-1",
      "about": "The timeout to await a response in milliseconds." }
  ]
}


Note:
To maintain the same behavior as existing:

  1. AdminClient will supply the "default.api.timeout.ms" as timeout in the ListOffsetsRequest and
  2. Consumer will supply the "request.timeout.ms" as timeout in the ListOffsetsRequest 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

       There won't be any behavior change to the existing clients. For older clients, to increase the timeout for remote LIST_OFFSETS, the user have to change the timeout on both the server and client end to make it effective. For newer clients, they can supply the request timeout in the ListOffsetsRequest.


  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

  • The patch will be covered with unit and integration tests. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  1. We can add a new timeout attribute for the ListOffsetsRequest. With this, only the new clients can be able to specify the timeout. For requests from older clients, we don't know what timeout to set for the request.
  • No labels