DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accept
Discussion thread: here
Vote thread: here
JIRA: KAFKA-19426
PR: https://github.com/apache/kafka/pull/20203
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
TopicBasedRemoteLogMetadataManager(TBRLMM) is the key built-in implementation for tiered-storage feature. It uses the Kafka topic to maintain the metadata for remote storage.
Thus. when we begin to support tiered-storage with it. We found that the TBRLMM's initialization failed in some Kafka clusters sometimes.
The root cause is the initialization(TopicBasedRemoteLogMetadataManager#initializeResources) will use the __remote_log_metadata but the server is not ready for handle the request.
We can check with broker's startup sequence refer to follow snapshot:
So the initialization will have to relay on the retry to get success until the broker is ready. The retry many happen with multiple times:
FYI: according to one of kamalcph 's test result:
Thus. the critical bad case is that the default retry time (DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS: 2 Minutes) is not enough for some Kafka cluster which take > 2 minutes to complete the startup.
Base on this. the fail will happen. As one result. The fail will cause the feature broken and local disk never get deleted and some other issues for different cases.
As one workaround solution. we can check every kafka cluster's startup consume time and set a very big value for the DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS due to the startup time may increased in future.
But It is not reasonable for one configure need to change again and again or set to a very large value . What's more, as we mentioned above you will find there are lots of warn log which hint the connection not available. This can be avoided by this change.
[2025-07-19 18:00:17,923] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (10.20.4.98:9559) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient) |
So create this KIP to solve this issue.
Public Interfaces
N/A
Note: In the original design, we defined a public interface named BrokerReadyCallback for users, which was located in the package org.apache.kafka.server.common. After discussion, we agreed that this design would be over-engineered at this stage. Therefore, we decided to move it to org.apache.kafka.server.log.remote.metadata.storage. If any other module needs to use it in the future, we can propose to move it back. Based on this change, I have moved related interface description from this part to "Proposed Changes"
Proposed Changes
add dedicated interface in package:
org.apache.kafka.server.log.remote.metadata.storage
/**
* Simple callback interface for broker ready notification.
*/
public interface BrokerReadyCallback {
/**
* This method will be called during broker startup for the implementation
* which needs delayed initialization until the broker can process requests.
*/
void onBrokerReady();
}
add the interface into TopicBasedRemoteLogMetadataManager 's implements to delay the initialization:
We postpone the TopicBasedRemoteLogMetadataManager's initialization part (quering metedata from remote topic) after the server is ready for the request.
Then the retry time is reasonable without worried about different Kafka clusters. and the connection not available log won't be seen.
public class TopicBasedRemoteLogMetadataManager implements BrokerReadyCallback, RemoteLogMetadataManager{
//Ignore other codes.
* Invoked when the broker is ready to handle requests. This triggers the initialization of
* resources including Kafka clients that operate on the remote log metadata topic.
* <p>
* The target cluster for the topic is determined by configuration and can be either:
* <ol>
* <li>The local cluster (most common) - the initialization is deferred until the broker is ready
* to handle requests. Early initialization would lead to connection failures.</li>
* <li>A remote cluster - the delay is not necessary but causes no harm.</li>
* </ol>
* <p>
* By using the broker ready state as the initialization trigger, the implementation optimally handles
* the typical case while remaining correct for alternative configurations.
*/
@Override
public void onBrokerReady() {
log.info("Broker is ready for requests, now initializing topic-based RLMM resources");
initializationThread.start();
}
}
For detailed information. You can refer to https://github.com/apache/kafka/pull/20203/files
Compatibility, Deprecation, and Migration Plan
- Considerations for compatibility:remote.log.metadata.initialization.retry.max.timeout.ms' implement changed
Previously, the timer was started during TopicBasedRemoteLogMetadataManager initialization, before the broker was ready to handle requests.
After this change, the timer starts after the broker is ready to handle requests.
However, this change effectively increases the existing timeout duration. It does not break existing functionality and introduces no compatibility issues. The documentation for the time can be updated to clarify the starting point of the timer.
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
- Will cover the patch with deploy test and check if the startup can success without any connection retry error for remote storage. You can refer to test case
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
- Another discussed approach is not to call RLMM#configure() method while instantiating the RemoteLogManager#L422 and define a new method in RemoteLogManager#configureRLMM and this can be called from the BrokerServer. You can refer to the code.
But the change breaks that contract:
Accroding to KIP-877: "If a plugin implements this interface, the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable). "


