Status

Current state: Accepted

Discussion thread: here

JIRA:   KAFKA-15265 - Getting issue details... STATUS  

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Remote Log Manager a.k.a. RLM is one of the critical components in Kafka that helps support tiered storage. One of its responsibilities is to periodically upload rolled-over log segments on the local disk to the remote storage. It discovers the log segments eligible to be uploaded and then uploads them. RLM creates one such task (RLMTask) for each topic partition managed by the broker and schedules them to run at a fixed frequency (of 30 secs) using a ScheduledThreadPoolExecutor.

When an RLMTask identifies a list of segments to upload, it tries to upload all segments in the same run. As a result, if there are a huge number of eligible segments, the RLM task processes them as quickly as possible. Since multiple such tasks are running concurrently within the thread-pool executor, they end up consuming significant CPU, thereby affecting producer latencies. This phenomenon is often noticed when we enable tiered storage for existing topics on a cluster. 

Controlling the rate of segment uploads to remote storage would prevent over-utilization of CPU and cluster degradation. 

Similarly, the RLM also plays a role in serving read requests for log segments residing in the remote storage. When receiving a request for remote read, the RLM submits an async read task to its internal thread pool executor. The async task reads data from the remote storage that is then sent back in the fetch response. 

A large number of read requests requesting data residing on the remote storage could cause degradation of the remote storage. The large number of read requests may also cause over-utilization of CPU on the broker. This may happen when the majority of the consumers start reading from the earliest offset of their respective Kafka topics. To prevent such degradation, we should have a mechanism to control the rate at which log segments are read from the remote storage. 

Goals

  • The administrator should be able to configure an "upper bound" on the rate at which log segments are uploaded to the remote storage.
  • The administrator should be able to configure an "upper bound" on the rate at which log segments are read from the remote storage.

Public Interfaces

Broker Configuration Options

New broker properties will be added to configure quotas for both reading for remote storage as well as writing to the remote storage. We will be using the existing quota framework to keep track of the current read and write rates. 

Configuration for write quotas:

  •  remote.log.manager.copy.max.bytes.per.second
    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated to remote upload of segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.copy.quota.window.num

    • Type: Integer
    • Mode: Static 
    • Description: The number of samples to retain in memory for remote upload quotas
    • Default value: 61
  • remote.log.manager.copy.quota.window.size.seconds

    • Type: Integer
    • Mode: Static
    • Description: The time duration of each sample for remote upload quotas  
    • Default value: 1

Configuration for read quotas:

  • remote.log.manager.fetch.max.bytes.per.second

    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated for reading remote log segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.fetch.quota.window.num

    • Type: Integer
    • Mode: Static
    • Description: The number of samples to retain in memory for remote read quotas
    • Default value: 11
  • remote.log.manager.fetch.quota.window.size.seconds
    • Type: Integer
    • Mode: Static
    • Description: The time duration of each sample for remote read quotas
    • Default value: 1

The quotas (remote.log.manager.copy.max.bytes.per.second and remote.log.manager.read.max.bytes.per.second) are dynamically configurable. Users will be able to change these configs using either kafka-configs.sh or AdminClient by specifying the config name and the new value.

Example:

Using kafka-configs.sh

bin/kafka-configs.sh --bootstrap-server <bootstrap-server> --entity-type brokers --entity-default --alter --add-config remote.log.manager.copy.max.bytes.per.second=52428800

Using AdminClient

ConfigEntry configEntry = new ConfigEntry("remote.log.manager.copy.max.bytes.per.second", "5242800");
AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
List<AlterConfigOp> alterConfigOps = Collections.singletonList(alterConfigOp);

ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, "");
Map<ConfigResource, Collection<AlterConfigOp>> updateConfig = ImmutableMap.of(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(updateConfig);

Proposed Changes

This is an enhancement to the existing mechanism for reading/writing log segments from/to the remote storage. In the absence of a throttling mechanism, the remote storage can degrade when many fetch requests request data from the remote storage or when a large amount of writes happen to the remote storage. Also without such a mechanism, the CPU on the brokers would become overly busy causing degradation of the Kafka cluster and impacting the producer latencies.

Throttling Data Writes

We add a new component, RLM WriteQuotaManager to manage quotas for remote writes. It is similar to other existing QuotaManagers (for eg. ClientQuotaManager). It can be configured with the desired write quota. It keeps track of the current usage and can be used to check if the quota is exhausted. We will use it to record the segment size when a segment is uploaded to the remote, so it can track the rate of bytes upload. We can use it to check if the current upload rate is within the quota before uploading new segments to the remote.

RLM  WriteQuotaManager

The WriteQuotaManager allows users to specify the following properties:

  • Write Quota in bytes per second (default: infinite) 
  • Number of samples to retain in memory (default: 61)
  • The timespan of each sample (default: 1 second)

The QuotaManager utilizes a rolling window to track the bytes uploaded every second. There is a time bucket for each sample and it stores the total bytes uploaded in the time bucket.

T1T2...T60T61

61 buckets of size 1 second (default). There are 60 whole buckets and one additional bucket is to track usage for the current window. 


QuotaManager supports the following two operations:

  • Record the uploaded bytes - The bucket corresponding to the current time is updated with the bytes uploaded. The bucket holds the total bytes of data uploaded in the associated time bucket.
  • Check if the write quota has been exceeded - It computes the average bytes uploaded per second across all time buckets and checks if the average is more than the specified quota. For eg. if the timespan of each bucket is 1 second, the average across all buckets gives the average upload rate in bytes/sec which can be compared with the quota specified in bytes/sec.

The WriteQuotaManager is shared by all the threads of the ScheduledThreadPoolExecutor that is used to execute RLMTasks and keeps track of the total segment upload rate on the broker. The RLMTask will do the following to ensure the remote write quota specified for the cluster is respected:

  • In each run, RLMTask identifies all the log segments for the given topic partition that are eligible for upload. The task then attempts to upload the segments to remote storage in a loop.
  • Before uploading the next log segment, it checks whether the write quota has already been violated. If the quota has not been violated, it first uploads the log segment to the remote storage and then records the number of bytes uploaded with the WriteQuotamanager. Thus, the quota manager can see the updated view of quota utilization.
  • If the quota is already exhausted, the RLMTask waits until the write rate falls below the specified quota. Once the write rate falls, the task uploads the segment, records the number of bytes uploaded with the WriteQuotaManager, and moves on to the next segment.
  • This approach may cause starvation for low throughput topics, since the RLM task for high throughput topics may not give up the thread (the task waits tills the write quota falls below the quota). Starvation may not be a problem, because RLM is still running at the maximum capacity to offload segments to remote, thus preventing the local disk from growing. However, if fairness is desirable, the RLM task should exit if runs into 'quota exceeded error' and it has uploaded at least one segment in its run. This will allow other RLM tasks a chance to be executed. They may run into the same error but will run once the quota utilization subsides.

An RLMTask is also responsible for handling expired remote log segments for the associated topic partition. It cleans up those remote log segments that are expired and are no longer required. With the change in the behavior to block if the remote write quota is exhausted, clean-up of expired segments may get affected and may get stalled if the segment upload rate across the cluster is high causing excessive throttling. To solve this problem, we can break down the RLMTask into two smaller tasks - one for segment upload and the other for handling expired segments. The two tasks shall be executed in separate ThreadPoolExecutors. This will remove the impact of the throttling of segment uploads on segment expiration.

Throttling Data Reads

We shall add a new component RLM ReadQuotaManager to manage quotas for remote reads. It is similar to the WriteQuotaManager discussed in the previous section, but is meant for tracking reads instead of writes. It can be configured with the permitted read quota. It can then be used to record the quota consumed following a data read and also to check if the quota has already been exceeded.

RLM  ReadQuotaManager

The ReadQuotaManager allows users to specify the following properties:

  • Read Quota in bytes per second (default: infinite) 
  • Number of samples to retain in memory (default: 11)
  • The timespan of each sample (default: 1 second)

When a fetch request requires remote data for a topic partition, the ReplicaManager will query the ReadQuotaManager to check whether the quota for reading remote data has already been exhausted. If the quota has not been exhausted, the ReplicaManager will continue to return the RemoteStorageFetchInfo that can be used to read the requested data from the remote storage. Once data from the remote storage is read, RLM records the number of bytes read with the ReadQuotaManager. Hence, the quota manager always has an updated view of quota utilization.

However, if the quota has already been exhausted, the ReplicaManager returns an empty RemoteStorageFetchInfo. As a result, the fetch request can still read data for the other topic partitions, but there will be no data for the topic partition requiring remote data as long as the read quota has been exhausted. Once the read rate falls below the permitted quota, fetch requests can resume serving remote data.

It is worth noting that there is a possibility that a rogue client can try to read large amounts of data from remote storage, exhausting the broker-level remote read quota. Once the quota is exhausted, all remote reads will be throttled, which will even prevent other polite clients from reading remote data. To solve for this, we can set client-level fetch quotas (should be lesser than broker-level remote read quota). This will prevent the rogue client from reading large amounts of remote data, which in turn will prevent the throttling of remote reads for other clients.

New Metrics

The following new metrics will be added:

MBeanDescription
kafka.log.remote:type=RemoteLogManager, name=remote-fetch-throttle-time-avgThe average time in millis remote fetches was throttled by a broker
kafka.log.remote:type=RemoteLogManager, name=remote-fetch-throttle-time-maxThe max time in millis remote fetches was throttled by a broker
kafka.log.remote:type=RemoteLogManager, name=remote-copy-throttle-time-avgThe average time in millis remote copies was throttled by a broker
kafka.log.remote:type=RemoteLogManager, name=remote-copy-throttle-time-maxThe max time in millis remote copies was throttled by a broker

Test Plan

Unit tests for both read and write quotas

Rejected Alternatives

Throttling Data Writes

In this section, we discuss some other approaches for throttling remote writes.

  • We can decrease the pool size for the ThreadPoolExecutor. This would prevent too many concurrent uploads from running thus preventing over-utilization of CPU. 

Pros:

    • Simplicity. No new feature needs to be built. We will only need to adjust the thread pool size which can be done dynamically.

Cons:

    • This approach relies on reducing concurrency to achieve lower upload speed. In practice, we would know what the maximum upload rate our remote storage can support. It is not straightforward to translate this to the concurrency of upload threads and requires hit-and-trial approach.
    • Reducing concurrency can also introduce unfairness while uploading segments. When an RLMTask runs for a topic partition, it uploads all the eligible log segments in the same run preventing uploads for other topic partitions This can cause delays and lag buildup for other topic partitions, particularly when the thread pool size is small. If some topics have a high message-in rate, the corresponding RLMTasks would end up using all threads in the threadpool preventing uploads for smaller topics.
  • We could use the QuotaManager differently. Instead of tracking the global upload rate, we could track the upload rate per thread in the writer threadpool. Each thread records its upload speed with the quota manager and checks for quota violations before uploading the next log segment.

Cons:

    • Each thread only gets a fraction of the global quota. This will lead to under-utilization of the resources.  For eg., if the global quota is 100 MBps and the threadpool size is 10, each thread can never upload at more than 10 MBps. But if some topic partitions have message-in rate higher than 10 MBps, the upload rate can never catch up with the message-in rate even though the overall upload rate is below 100 MBps.

The alternate architectures are rejected for the disadvantages discussed above.

Throttling Data Reads

In this section, we discuss some other approaches for throttling remote reads.

  • We can decrease the threadpool size for the ThreadPoolExecutor. This would prevent too many concurrent reads.Pros:
    • Simplicity. No new feature needs to be built. We will only need to adjust the thread pool size which can be done dynamically.

Cons:

    • It requires a hit-and-trial approach to set the threadpool size appropriately so as not to exceed a certain read rate from the remote storage.
    • The setting is dependent on the broker hardware and needs to be tuned accordingly.
  • We could use the QuotaManager differently. It will be used by the worker threads in the reader threadpool to check if the read quota has been exceeded. If it isn’t exceeded, the read task is processed. Otherwise, the read task must wait. This can be implemented in two ways:
    • Read Task computes the throttle time using the quota framework and sleeps for that amount of time before executing the request. When the task wakes up, the read rate would have fallen within the specified quota. 
    • Another approach is that instead of waiting, the RLMTask execution can be deferred and can be scheduled to run with some delay, i.e. throttle time. 

The drawback of the above two approaches is that if the remote read quota has been exhausted, the RLM will keep accepting more read tasks to be executed later. The fetch request corresponding to the read task may have already timed out by the time the read task gets executed. This will lead to a waste of resources for the broker. 

  • We could use the throttle time (computed from the quota framework) to throttle the client instead. The throttle time can be propagated back in the response payload and we use it to throttle further requests from the client. However, this approach prevents the client from reading any partition even though it may not request data for a partition with remote data.

The alternate architectures are rejected for the disadvantages discussed above.


  • No labels