Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will add a single top-level source connector configuration property that specifies the the user-defined names of zero or more topic creation rules. Each rule will be defined by one configuration property that define the regular expression used to match topic names , another configuration property that determines the policy about how the matching topics should be created, and additional configuration properties that define the topic-specific settings. Here, the names of the rules are user-defined, may not contain the comma (e.g., ',') or period (e.g., '.') characters.

...

topic.replication.factor
PropertyTypeDefaultPossible ValuesDescription
topic.creation
list<empty><any>The names of the topic creation rules, in the order that they should be matched against new topics used by the source connector.
topic.creation.${ruleName}.regex
stringn/avalid regexThe regular expression that will be applied to new topics. The first rule whose regular expression matches the new topic name will be used.
topic.creation.${ruleName}.policyreplication.factor
enumint"create""autocreate", "create", "fail"How new topics to which the rule applies should be created. When set to "created", Connect will attempt to use the Admin API to explicitly create the topic. When set to "autocreated", Connect will attempt to let the broker autocreate the topic per its own settings; all topic-specific settings on this rule will be ignored. When set to "fail", any topic to which the rule applies will result in a failure of the task if the topic does not already exist.3>= 1 when a value is specifiedThe replication factor for the topics created using this rule.
topic.creation.${ruleName}.partitions
int1>= 1 when a value is specifiedThe number of partitions for the topics created using this rule.
topic.
creation.${ruleName}.
int3>= 1 when a value is specifiedThe replication factor for the topics created using this rule.
topic.creation.${ruleName}.partitions
int1>= 1 when a value is specifiedThe number of partitions for the topics created using this rule.
topic.creation.${ruleName}.${kafkaTopicSpecificConfigName}
n/aAny of the Kafka topic-level configurations. The broker's topic-level configuration value will be used if that configuration is not specified for the rule.

The following example defines two topic creation rules named "firstRule" and "defaultRule":

${kafkaTopicSpecificConfigName}

n/a
Any of the Kafka topic-level configurations. The broker's topic-level configuration value will be used if that configuration is not specified for the rule.


The following example defines two topic creation rules named "firstRule" and "defaultRule":

Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
topic.creation=firstRule,defaultRule

topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.
Code Block
languagetext
titlePortion of an example source connector configuration using topic creation rules
topic.creation=firstRule,defaultRule

topic.creation.firstRule.regex=MyPrefix.*
topic.creation.firstRule.replication.factor=3
topic.creation.firstRule.partitions=5
topic.creation.firstRule.cleanup.policy=compact
topic.creation.firstRule.min.insync.replicas=2
topic.creation.firstRule.unclean.leader.election.enable=false

topic.creation.defaultRule.regex=.*
topic.creation.defaultRule.replication.factor=3
topic.creation.defaultRule.partitions=1
topic.creation.defaultRule.cleanup.policy=compact
topic.creation.defaultRule.min.insync.replicas=2
topic.creation.defaultRule.unclean.leader.election.enable=false

This style of configuration properties is very similar to those defined for . These properties can appear in the connector's configuration in any order, but the order of the names in "topic.creation" is important and defines the order in which the framework evaluates whether each rule applies to a topic with a given name. For example, if a new topic named "MyPrefixABC" is to be created, the framework would first use the regular expression of the "firstRule" to see if it matched the topic name "MyPrefixABC". Because it does, the topic-specific settings defined in the properties beginning with "topic.creation.firstRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would not match the "firstRule" but would match the "defaultRule", and thus the topic-specific settings defined in the configuration properties beginning with "topic.creation.defaultRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic.

Here is another example that shows how to use one rule for topics whose names begin with "MyPrefix", and a second default rule that will result in connector failure if the framework needs to create topics because they don't yet exist and are used in source records. This provides a way to enforce that a source connector does not attempt to create unwanted topics.

...

languagetext
titlePortion of an example source connector configuration using topic creation rules with fail option

...

"topic.creation.firstRule.

...

" would be used and passed to the connector for validation / overrides and ultimately used to create the topic. However, a topic named "XYZ" would not match the "firstRule" but would match the "defaultRule", and thus the topic-specific settings defined in the configuration properties beginning with "topic.creation.defaultRule." would be used and passed to the connector for validation / overrides and ultimately used to create the topic.

Java API

To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:

...

Kafka Connect will provide an implementation of TopicSettings. When Kafka Connect sees a topic in the SourceRecords that the worker has not yet seen since starting up, it will check whether that topic exists using the Admin API of the broker. If the topic does not exist, it will find the first applicable topic creation rule, instantiate a TopicSettings object with that rule's topic-specific settings, pass it to the SourceTask's settingsForNewTopic method, and use the resulting TopicSettings instance and its topic-specific settings when the framework attempts to create the new topic. Note that Kafka Connect will altogether skip creating the new topic if no topic creation rule applies to the topic or if the settingsForNewTopic method returns null. Kafka Connect will log these activities at the appropriate level. 

Kafka Connect will do nothing if the broker does not support the Admin API methods to check for the existence of a topic and to create a new topic. This is equivalent to relying upon auto-topic creation.Note that the Java API does not allow the connector to override the creation policy for a topic. The creation policy must only be set in the connector configuration.

Compatibility, Deprecation, and Migration Plan

...