You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: KAFKA-15859 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are used to handle the LIST_OFFSETS requests, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. To overcome this, we plan to introduce a delayed remote list offsets purgatory that delegates the request from request-handler to remote-log-reader threads. And, the request gets handled in the async fashion, freeing up the request-handler threads to handle other high priority requests.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Introduce a new dynamic remote.list.offsets.request.timeout.ms broker config in RemoteLogManagerConfig to define the timeout for the remote list offsets requests.
  • Metric to emit the expired DelayedRemoteListOffsets request. 
    • kafka.server:type:DelayedRemoteListOffsetsMetrics,name=ExpiresPerSec,topic=([-.\w]+)

Proposed Changes

The LIST_OFFSETS API is invoked by:

  1. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
  2. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
  3. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) and
  4. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)

Both KafkaConsumer and Admin defines the default api-timeout to 60 seconds and default request-timeout to 30 seconds. There is no upper timeout boundary for handling the LIST_OFFSETS request on the server side and the client does not convey any timeout in the ListOffsetsRequest.json request.

Current behavior

KafkaConsumer sends one or more requests internally depends on the configured request-timeout and default-api-timeout. (eg) If the consumer is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the consumer internally sends 10 LIST_OFFSETS requests before failing with TimeoutException after 5 minutes.

Admin sends only one request and wait for upto default-api-timeout. (eg) If the admin is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the admin sends only one LIST_OFFSETS request, then receives the request from server after 50 seconds.


Proposal

  1. We can add a new timeout attribute for the ListOffsetsRequest. With this, only the new clients can be able to specify the timeout. For requests from older clients, we don't know what timeout to set for the request.
  2. Proposing a new dynamic remote.list.offsets.request.timeout.ms broker config which defaults to 30 seconds. If the remote LIST_OFFSETS requests couldn't be served within the timeout, then the TimeoutException will be thrown back to the caller. The client won't have the ability to adjust the timeout.  The cluster operator can update/increase the remote LIST_OFFSETS request timeout when the remote storage gets degraded. We can emit an expired DelayedRemoteListOffsets metric, based on which the cluster operator/admin can increase the timeout on the server. 
  3. One LIST_OFFSETS request is served by one request-handler thread. When there are multiple partitions/topics in the request, then the partitions in the request gets processed serially. With DelayedRemoteListOffsets purgatory, we will create a task for each partition in the request and add it to the queue. The remote-log-reader threads can pick up the task when they are available and the request gets served in async and parallel fashion.
  4. Similar to remote-log-reader thread pool, we can define one more dedicated thread pool to serve remote LIST_OFFSETS requests. But, we anticipate that it might be an overkill to have a separate thread pool to handle one type of request.   

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

       There won't be any behavior change to the existing clients. To increase the timeout for remote LIST_OFFSETS, the user have to change the timeout on both the server and client end to make it effective.


  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

  • The patch will be covered with unit and integration tests. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels