This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-158: Kafka Connect should allow source connectors to set topic-specific settings for new topics

Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


PropertyTypeDefaultPossible ValuesDescription
booleanfalsetruetrue, falseWhether the Connect worker should attempt to create topics to which source connectors write, when the configuration for those source connectors declares at least the default replication factor and number of partitions for new topics. The default is 'false', meaning that Connect will never attempt to create topics to which source connectors writeallow source connector configurations to define topic creation settings. When 'true', source connectors can use this feature. When 'false', new source connector configurations that use these `topic.creation.*` configs would error, while these configs would be ignored (and a warning reported) for previously-registered source connector configs that used these properties.

Source Connector Configuration


Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., `producer.*`) must have the permission to create topics. DESCRIBE and CREATE topics. If the worker's producer does not have the necessary privileges to DESCRIBE existing and CREATE missing topics but a source connector does specify the `topic.creation.*` configuration properties, the worker will log a WARNING and will default to the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed.

Note that when the Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets. If creating topics is not desired for security purposes, this feature should remain be disabled.

Compatibility, Deprecation, and Migration Plan

When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will not be enabled and but as previously-registered source connector configurations would not include any `topic.creation.*` configuration properties, Kafka Connect will behave exactly as before by relying upon the broker to auto-create any new topics or upon users to manually create the topics before they are used. There are no current plans to remove this legacy behavior. by assuming the topics exist or else will be auto-created by the broker.

After upgrading, users must alter the configuration of all workers in the Connect cluster to enable the feature, and then also must modify the configuration of any source connector to enable the creation of new topics, by defining adding the topic.creation.default.replication.factor and topic.creation.default.partitions properties plus optionally other topic.creation.default.* properties.

This feature will not affect source or sink connector implementations, as the connector API is unchanged and running connectors have no exposure to this feature. It also does not change the topic-specific settings on any existing topics.

Finally, this feature uses Kafka's Admin API methods to check for the existence of a topic and to create new topics. This feature will do nothing if the broker does not support the Admin API methods, which is equivalent to relying upon auto-topic creation. If ACLs are used, the Kafka principal used in the Connect worker's `producer.*` settings is assumed to have privilege to create topics when needed; if not, an error will be logged but the worker will revert to the old behavior of assuming the topics exist or will be auto-created by the broker.

Rejected Alternatives

Several alternative designs were considered but ultimately rejected:

  1. Change only the Java API and have no configuration changes. This very simple approach would have required no changes to a connector configuration yet still given the source connector tremendous flexibility and responsibility in defining the topic-specific settings for each new topics (e.g., using the Admin API). This approach was rejected because it still relies upon the connector implementation to address/handle all variation in topic-specific settings that might be desired between new topics; because connector users have very little control over the topic-specific settings; and because the connector to be modified to take advantage of the new feature and would therefore not work with older connectors.
  2. Change the Java API and use connector configuration properties to define the topic-specific settings used as defaults on all topics. This approach is a bit more flexible than the first alternative in that it allows for connector users to specify some default topic-specific settings in configuration properties. However, this approach was rejected because it offers connector users very little flexibility since it still relies upon the source connector to determine the settings for each of the topics.
  3. Change the Java API and use connector configuration properties to define the topic-specific settings using rules that apply different settings to different topics. This approach was proposed in an earlier version of this KIP, but discussion highlighted that this was optimizing for the exceptional case where source connectors wrote to many topics and those topics needed different replication factors, number of partitions, and/or topic-specific settings. This resulted in a very complex configuration that was thought to be useful in a very small number of cases. It also exposed connectors to a new Java API, but again this would require changes in the source connector implementations and would restrict the Connect versions on which those connectors could be deployed.
  4. Allow the connector to modify the topic-specific settings on an existing topic. This can be complicated, since not all topic settings can be easily changed. It also would introduce potential conflicts between a connector and other admin clients that are attempting to change the topic configuration settings to different values. Such a scenario would be extremely confusing to users, since they might not expect that the source connector is configured to modify the topic settings for an existing topic.
  5. Should `topic.creation.default.replication.factor` have a default value? A default replication factor of 3 is a sensible default for production, but it would fail on small development clusters. By making this property be explicit, users that are configuring source connectors have to choose a value that makes sense for their Kafka cluster. It also has the advantage that not having a default means that this property is required to enable topic creation on a source connector, and this obviates the need for a separate `topic.creation.enabled` in the connector configuration.

  6. Should the default value for `topic.creation.default.replication.factor` take into account the current number of brokers? Doing so would be very brittle and subject to transient network partitions and/or failed brokers, since the actual number of brokers might be smaller than the replication factor assumed by the user creating the connector configuration, and the user would have no feedback that a topic was created with fewer replicas than desired.
  7. Should the `topic.creation.default.partitions` have a default value? The only sensible default is 1, and that's not always very sensible.

  8. Should the Connect worker have a new `disable.topic.creation.for.connectors` property? This property allows operators of a Connect cluster to prevent source connectors from even using this feature. It would be possible (albeit more complicated) to not have the worker configuration property and to instead expect operators to use ACLs and instead give the Connect worker's producer CREATE topic permissions.