Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
As of 0.11.0.0, Kafka Connect can automatically create its internal topics using the new AdminClient (see KIP-154), but it still relies upon the broker to auto-create new topics to which source connector records are written. This is error prone, as it's easy for the topics to be created with an inappropriate cleanup policy, replication factor, and/or number of partitions. Some users would rather not configure their brokers with auto.create.topics.enable=true, and in these cases users must manually pre-create the necessary topics. That, of course, can be quite challenging for some source connectors that choose topics dynamically based upon the source and that result in large numbers of topics.
Kafka Connect should instead be able to create the topics automatically for source connectors, using a replication factor, number of partitions, and other topic-specific settings declared in a source connector configuration. If these properties are not specified, the previous Connect behavior of relying upon the topics to exist or be auto created by the broker. Additionally, operators of Connect clusters should be able to either enable or disable this feature.
This feature does not affect sink connectors and does not change the topic-specific settings on any existing topics.
This proposal defines a simple way for source connector configurations to specify whether topics to which the source connector will write should be created by Connect if those topics do not already exist. Additionally, this feature is disabled by default for the whole Connect cluster, though it can be enabled via a new Connect worker configuration property.
In order to use this feature, the Connect cluster operator must configure the configurations for all Connect workers in the cluster with `
topic.creation.enable=true`. Even then, the feature will only be used for source connectors whose configuration specifies the default topic attributes for new topics (see below).
This proposal adds one new Connect worker configuration, which must be set identically on all workers in the Connect cluster:
|boolean||true||true, false||Whether the Connect worker should allow source connector configurations to define topic creation settings. When 'true', source connectors can use this feature. When 'false', new source connector configurations that use these `topic.creation.*` configs would error, while these configs would be ignored (and a warning reported) for previously-registered source connector configs that used these properties.|
This proposal adds several source connector configuration properties that specify the default replication factor, number of partitions, and other topic-specific settings to be used by Connect to create any topic to which the source connector writes that does not exist at the time the source connector generates its records. None of these properties has defaults, so therefore this feature is enabled for this connector only when the feature is enabled for the Connect cluster and when the source connector configuration specifies at least the replication factor and number of partitions.
|int||n/a||>= 1 when a value is specified||The replication factor for new topics created for this connector. This value must not be smaller than the number of brokers in the Kafka cluster.|
|int||n/a||>= 1 when a value is specified||The number of partitions new topics created for this connector.|
|n/a||Any of the Kafka topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value will be used if that configuration is not specified for the rule.|
Note that the Kafka topic-level configurations does vary by Kafka version, so source connectors should specify only those topic settings that the Kafka broker knows about.
Also, these properties have no effect if the feature is disabled on the Connect cluster via `
topic.creation.enable=false` in the cluster's worker configurations.
The replication factor and number of partitions must be specified in the source connector configuration to enable topic creation for the connector. The following shows an example of these properties specifying for all new topics created by Connect for this connector a replication factor of 3 and 5 partitions:
... topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 ...
The source connector configurations can optionally specify Kafka topic-level configurations to override the broker's defaults for new topics. The following shows an example specifies the `compact` cleanup policy, a minimum of 2 in-sync replicas, and no unclean leader election:
... topic.creation.default.replication.factor=3 topic.creation.default.partitions=5 topic.creation.default.cleanup.policy=compact topic.creation.default.min.insync.replicas=2 topic.creation.default.unclean.leader.election.enable=false ...
This feature does not affect sink connectors or their configuration.
The existing Connect REST API includes several resources whose request and response payloads will be affected by this proposal, although the structure of those payloads are already dependent upon the specific type of connector. Applications that use the REST API must already expect such variation, and therefore
When topic creation is enabled in the Connect worker, the worker may attempt to create topics to which the source connector(s) write that are not known to exist. The Admin API allows the Connect worker to request these topics be created, but will only attempt to create topics that do not already exist.
Therefore, in order to use this feature, the Kafka principal specified in the worker configuration and used for the source connectors (e.g., `producer.*`) must have the permission to DESCRIBE and CREATE topics. If the worker's producer does not have the necessary privileges to DESCRIBE existing and CREATE missing topics but a source connector does specify the `topic.creation.*` configuration properties, the worker will log a WARNING and will default to the previous behavior of assuming the topics already exist or that the broker will auto-create them when needed.
Note that when the Connect worker starts up, it already has the ability to create in the Kafka cluster the internal topics used for storing connector configurations, connector and task statuses, and source connector offsets. If creating topics is not desired for security purposes, this feature should be disabled.
When users upgrade an existing Kafka Connect installation, they do not need to change any configurations or upgrade any connectors: this feature will be enabled but as previously-registered source connector configurations would not include any `topic.creation.*` configuration properties, Kafka Connect will behave exactly as before by assuming the topics exist or else will be auto-created by the broker.
After upgrading, users must alter the configuration of any source connector to enable the creation of new topics, by adding the
topic.creation.default.partitions properties plus optionally other
This feature will not affect source or sink connector implementations, as the connector API is unchanged and running connectors have no exposure to this feature. It also does not change the topic-specific settings on any existing topics.
Finally, this feature uses Kafka's Admin API methods to check for the existence of a topic and to create new topics. This feature will do nothing if the broker does not support the Admin API methods, which is equivalent to relying upon auto-topic creation. If ACLs are used, the Kafka principal used in the Connect worker's `producer.*` settings is assumed to have privilege to create topics when needed; if not, an error will be logged but the worker will revert to the old behavior of assuming the topics exist or will be auto-created by the broker.
Several alternative designs were considered but ultimately rejected:
Should `topic.creation.default.replication.factor` have a default value? A default replication factor of 3 is a sensible default for production, but it would fail on small development clusters. By making this property be explicit, users that are configuring source connectors have to choose a value that makes sense for their Kafka cluster. It also has the advantage that not having a default means that this property is required to enable topic creation on a source connector, and this obviates the need for a separate `topic.creation.enabled` in the connector configuration.
Should the `topic.creation.default.partitions` have a default value? The only sensible default is 1, and that's not always very sensible.