Status

Current state["Accepted"]

Discussion thread: here

JIRA: KAFKA-17928 and KAFKA-17980

Motivation

RemoteLogManager uses multiple thread-pools to copy the segments, delete the expired segments, and read the data from remote storage. There can be situation to tune the number of threads in those pool to manage the incoming load. The proposal is to make them dynamic. 

Public Interfaces

  • Make the below configs in the RemoteLogManager to be updated dynamically. Once they are updated, the respective thread-pool will be resized:
    • remote.log.manager.copier.thread.pool.size
    • remote.log.manager.expiration.thread.pool.size and
    • remote.log.reader.threads 
  • Add a new method isReady in RemoteLogMetadataManager.

Proposed Changes

  1. The thread-pools used by the RemoteLogManager does not allow to reconfigure the thread count dynamically. This is similar to updating the request handler thread counts dynamically and required to handle any unprecedented load in the remote storage. 
  2. When a broker comes up online, the RemoteLogMetadataManager (RLMM) gets initialized in the background using the async fashion. The server starts to copy/delete the expired remote log segments, then the RLMM throws exception until it gets initialized. By adding the isReady method, the server can delay the copying (or) deleting the remote log segments until the RLMM gets initialized. This is a graceful handling to avoid exception/error logs while starting the server. 
Exception during init
[2024-11-06 09:33:16,867] WARN [RemoteLogManager=0 partition=xp3zuxhTQ9uUwk-uQhTx5g:topicA-0] Current task for topic-partition xp3zuxhTQ9uUwk-uQhTx5g:topicA-0 received error but it will be scheduled (kafka.log.remote.RemoteLogManager$RLMTask:793)
java.lang.IllegalStateException: This instance is in invalid state, initialized: false close: false
	at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:557)
	at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.highestOffsetForEpoch(TopicBasedRemoteLogMetadataManager.java:236)
	at kafka.log.remote.RemoteLogManager.findHighestRemoteOffset(RemoteLogManager.java:1827)
	at kafka.log.remote.RemoteLogManager$RLMFollowerTask.execute(RemoteLogManager.java:1434)
	at kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:784)


RemoteLogMetadataManager
    /**
     * Denotes whether the partition metadata is ready to serve.
     *
     * @param topicIdPartition topic partition
     * @return True if the partition is ready to serve for remote storage operations.
     */
    default boolean isReady(TopicIdPartition topicIdPartition) {
        return true;
    }


Compatibility, Deprecation, and Migration Plan

  • The proposed changes are fully backward compatible. 

Test Plan

  • Unit test to assert that the remote-log manager configs can be updated dynamically and thread-pool gets resized. 
  • Unit test to assert that the copy/expiration logic won't be invoked for a partition until the RLMM gets ready to serve remote storage operations. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels