Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

topic.creation.failRule.regex=.*
topic.creation.failRule.policy=fail


Java API

To allow source connector implementations may the ability to validate or override some or all of these topic-specific settings, we will modify the following existing abstract class in the Kafka Connect public API:
org.apache.kafka.connect.source.SourceTask
by adding a non-abstract method with the following signature that will by default simply return the input TopicSettings:


Code Block
languagejava
titleChanges to SourceTask.java
public abstract class SourceTask implements Task {
    ...
    /**
     * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
     * are to be written. This method is called whenever Connect sees a topic in the {@link SourceRecord}s for the first time and when
     * verifies that the topic does not already exist.
     * <p>
     * By default this method simply returns the supplied initial settings. Implementations can override this method
     * to set the topic-specific settings that should be used when creating the new topic. The broker's own
     * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
     * </p>
     *
     * @param settings the initial settings; never null
     * @param currentClusterSize the current number of brokers in the cluster, which can be used as an upper limit on the replication factor; always positive
     * @return the topic-specific settings; may be null if Connect should not attempt to create the topic (and potentially rely upon the broker auto-creating the topic)
     */
    public TopicSettings settingsForNewTopic(TopicSettings settings, int currentClusterSize) {
        return settings;
    }
}

...