You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: KAFKA-15859 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are used to handle the LIST_OFFSETS requests, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages. To overcome this, we plan to introduce a delayed remote list offsets purgatory that delegates the request from request-handler to remote-log-reader threads. And, the request gets handled in the async fashion, freeing up the request-handler threads to handle other high priority requests.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • The below broker configs will be introduced in the RemoteLogManagerConfig
    • remote.list.offsets.request.timeout.ms: to define the timeout for the remote list offsets requests. This will be a dynamic config.
    • remote.log.offset.reader.threads: dedicated thread pool to handle remote LIST_OFFSETS requests. This will be a dynamic config.
    • remote.log.offset.read.max.pending.tasks: defines the remote-offset reader thread pool queue size. This will be a static config.
  • Add a new maxTimestamp API in the RemoteLogMetadataManager to redact the maxTimestamp of all the uploaded segments. 
  • Below are the new metrics required for this KIP:
    • Metric to emit the remote-log offset reader thread pool task queue size and average idle percentage:
      • org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogOffsetReaderTaskQueueSize

      • org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogOffsetReaderAvgIdlePercent

    • Metric to emit the expired DelayedRemoteListOffsets request. 
      • kafka.server:type:DelayedRemoteListOffsetsMetrics,name=ExpiresPerSec,topic=([-.\w]+)

Proposed Changes

The LIST_OFFSETS API is invoked by:

  1. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
  2. KafkaConsumer#offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
  3. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) and
  4. Admin#listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options)

Both KafkaConsumer and Admin defines the default api-timeout to 60 seconds and default request-timeout to 30 seconds. There is no upper timeout boundary for handling the LIST_OFFSETS request on the server side and the client does not convey any timeout in the ListOffsetsRequest.json request.

Current behavior

KafkaConsumer sends one or more requests internally depends on the configured request-timeout and default-api-timeout. (eg) If the consumer is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the consumer internally sends 10 LIST_OFFSETS requests before failing with TimeoutException after 5 minutes.

Admin sends only one request and wait for upto default-api-timeout. (eg) If the admin is configured with default-api-timeout as 5 mins and request-timeout as 30 seconds. And, the server takes 50 seconds to process the LIST_OFFSETS request, then the admin sends only one LIST_OFFSETS request, then receives the response from server after 50 seconds.


Proposal

  1. Proposing a new dynamic remote.list.offsets.request.timeout.ms broker config which defaults to 30 seconds. If the remote LIST_OFFSETS requests couldn't be served within the timeout, then the TimeoutException will be thrown back to the caller. The client won't have the ability to adjust the timeout.  The cluster operator can update/increase the remote LIST_OFFSETS request timeout when the remote storage gets degraded. We can emit an expired DelayedRemoteListOffsets metric, based on which the cluster operator/admin can increase the timeout on the server. 
  2. One LIST_OFFSETS request is served by one request-handler thread. When there are multiple partitions/topics in the request, then the partitions in the request gets processed serially. With DelayedRemoteListOffsets purgatory, we will create a task for each partition in the request and add it to the queue. The remote-log-reader threads can pick up the task when they are available and the request gets served in async and parallel fashion.
  3. Similar to the remote-log-reader thread pool, we will be defining one more dedicated thread pool to serve remote LIST_OFFSETS requests to avoid noisy neighbour issues when handling the remote FETCH and LIST_OFFSETS requests.   
  4. A new API will be added in RemoteLogMetadataManager to find the maximum record timestamp of all the uploaded remote log segments. Each segment contains the maximum timestamp that was seen so far and the same information also propagated in the RemoteLogMetadata events. This API redacts the timestamps across all the uploaded remote log segments to provide the maximum timestamp seen so far. This API will be used to decide whether to read the log from earliest-offset (or) from the earliest-local-offset when handling the LIST_OFFSETS request for a valid timestamp.


RemoteLogMetadataManager
@InterfaceStability.Evolving
public interface RemoteLogMetadataManager extends Configurable, Closeable { 

   /**
     * Denotes the maximum timestamp of all the uploaded remote log segments for the given topic partition.
     * @param topicIdPartition topic partition
     * @return Maximum timestamp of all the uploaded remote log segments. Returns null if the RLMM is not initialized.
     * @throws RemoteStorageException if there are any storage related errors occurred.
     */
    default Long maxTimestamp(TopicIdPartition topicIdPartition) throws RemoteStorageException {
        return null;
    }
}


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

       There won't be any behavior change to the existing clients. To increase the timeout for remote LIST_OFFSETS, the user have to change the timeout on both the server and client end to make it effective.


  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

  • The patch will be covered with unit and integration tests. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  1. We can add a new timeout attribute for the ListOffsetsRequest. With this, only the new clients can be able to specify the timeout. For requests from older clients, we don't know what timeout to set for the request.
  • No labels