...
topic.creation.failRule.regex=.*
topic.creation.failRule.policy=fail
Java API
To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:
org.apache.kafka.connect.source.SourceTask
by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings:
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class SourceTask implements Task { ... /** * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task} * are to be written. This method is called whenever Connect sees a topic in the {@link SourceRecord}s for the first time and when * verifies that the topic does not already exist. * <p> * By default this method simply returns the supplied initial settings. Implementations can override this method * to set the topic-specific settings that should be used when creating the new topic. The broker's own * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object. * </p> * * @param settings the initial settings; never null * @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive * @return the topic-specific settings; may be null if Connect should not attempt to create the topic (and potentially rely upon the broker auto-creating the topic) */ public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) { return settings; } } |
...