DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Accepted"
Discussion thread: here
Vote thread: here
JIRA:
KAFKA-19655
-
Getting issue details...
STATUS
KAFKA-20001
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, num.partitions and default.replication.factor are applied inconsistently:
Normal topic auto creation (when
auto.create.topics.enableis enabled):
AutoTopicCreationManager uses the values frombroker.propertiesto build theCreateTopicsRequestfor normal (non-internal) topics.- Streams internal topic creation:
AutoTopicCreationManager builds theCreateTopicsRequestwithout applying the values frombroker.properties. The request will then be sent to the controller, which applies the values fromcontroller.properties. AdminClient#createTopics:
Forwards theCreateTopicsRequestto the controller and applies the values fromcontroller.propertiesif they are not set in the request.
This results in inconsistency: (1) relies on broker config, while (2) and (3) rely on controller config.
It causes confusion and makes maintenance harder, since the actual behavior is only clear by inspecting the code.
To resolve this, we should make these two configs (num.partitions and default.replication.factor) only effective in controller.properties.
Specifically, we should modify the current behavior in (1) and remove the application of these configs in AutoTopicCreationManager when creating CreateTopicsRequest for normal topics.
Public Interfaces
In 4.x, we should document clearly that
- public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + public static final String NUM_PARTITIONS_DOC = + "The default number of log partitions per topic. This configuration affects the following paths:" + + "<ul>" + + " <li>1. Auto topic creation</li>" + + " <li>2. Internal streams topic creation</li>" + + " <li>3. Topic creation via <code>AdminClient#createTopics</code> when the number of partition is set to -1</li>" + + "</ul>" + + "<p>For (1), the value from the broker configuration is used only when it is explicitly set. " + + "If it is not explicitly configured on the broker, the value from the controller configuration is used.</p>" + + "<p>For (2) and (3), the value from the controller configuration is always used.</p>";
- public static final String DEFAULT_REPLICATION_FACTOR_DOC = "The replication factor for automatically created topics," + - " and for topics created with -1 as the replication factor"; + public static final String DEFAULT_REPLICATION_FACTOR_DOC = + "The default replication factor per topic. This configuration affects the following paths:" + + "<ul>" + + " <li>1. Auto topic creation</li>" + + " <li>2. Internal streams topic creation</li>" + + " <li>3. Topic creation via <code>AdminClient#createTopics</code> when the replication factor is set to -1</li>" + + "</ul>" + + "<p>For (1), the value from the broker configuration is used only when it is explicitly set. " + + "If it is not explicitly configured on the broker, the value from the controller configuration is used.</p>" + + "<p>For (2) and (3), the value from the controller configuration is always used.</p>";
in 5.0 we need to do the following changes:
public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic. This only takes effect when specified in the controller configuration.";
public static final String DEFAULT_REPLICATION_FACTOR_DOC = "The default replication factor for topics, and for topics created with -1 as the replication factor. This only takes effect when specified in the controller configuration.";
And also remove num.partitions=1 setting in config/broker.properties example file.
Proposed Changes
in 4.x to keep backward compatibility, num.partitions and default.replication.factor from broker.properties only takes effect when user explicit defined them.
@@ -321,10 +322,13 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.shareCoordinatorConfig.shareCoordinatorStateTopicReplicationFactor())
.setConfigs(convertToTopicConfigCollections(shareCoordinator.shareGroupStateTopicConfigs()))
case topicName =>
+ val numPartitions: java.lang.Integer = if (config.originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG)) config.numPartitions else CreateTopicsRequest.NO_NUM_PARTITIONS
+ val replicationFactor: java.lang.Short = if (config.originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)) config.defaultReplicationFactor.toShort else CreateTopicsRequest.NO_REPLICATION_FACTOR
+
new CreatableTopic()
.setName(topicName)
- .setNumPartitions(config.numPartitions)
- .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+ .setNumPartitions(numPartitions)
+ .setReplicationFactor(replicationFactor)
}
}
And also print warning message if user set num.partitions or default.replication.factor in broker.properties.
@@ -579,6 +579,14 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
// warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
+ if (originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG)) {
+ warn(s"${ServerLogConfigs.NUM_PARTITIONS_CONFIG} is defined in the broker role. This configuration will be ignored in 5.0. " +
+ s"Please set ${ServerLogConfigs.NUM_PARTITIONS_CONFIG} in the controller role instead.")
+ }
+ if (originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)) {
+ warn(s"${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG} is defined in the broker role. This configuration will be ignored in 5.0. " +
+ s"Please set ${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG} in the controller role instead.")
+ }
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
in 5.0 we can remove the usage of num.partitions and default.replication.factor from broker.properties in AutoTopicCreationManager, and remove the warning message we added in KafkaConfig.
case topicName =>
new CreatableTopic()
.setName(topicName)
.setNumPartitions(CreateTopicsRequest.NO_NUM_PARTITIONS)
.setReplicationFactor(CreateTopicsRequest.NO_REPLICATION_FACTOR)
}
}
Compatibility, Deprecation, and Migration Plan
This KIP introduces a breaking change because it modifies where these two configurations are applied. However, the actual breaking change will only take effect in version 5.0; until then, we will maintain backward compatibility.
Test Plan
Covered by unit test and integration tests.
Rejected Alternatives
Apply broker
num.partitionsanddefault.replication.factoronly when explicitly specified inbroker.properties, otherwise fall back tocontroller.properties:
This approach is not considered viable because it is difficult to maintain. It would require adding logic to every place whereCreateTopicsRequestis handled. Moreover, the broker configs are only used in a single case today (i.e.,AutoTopicCreationManagercreating normal topics). Supporting this behavior would therefore be overkill.Apply changes to the related internal topic configs (e.g.,
share.coordinator.state.topic.num.partitions,share.coordinator.state.topic.replication.factor):
This would split the internal topic config scope between the controller and the broker, which is neither ideal nor intuitive. Users would have to configure some properties in the controller to take effect, while others would still need to be set in the brokers. This dual scope is confusing and should be avoided.