Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


This proposal adds a single non-abstract method to the existing org.apache.kafka.connect.source.SourceTask class:

public abstract class SourceTask implements Task {
* Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task}
* are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh
* <p>
 * By default this method simply returns the supplied initial settings. Implementations can override this method
* to set the topic-specific settings that should be used when creating the new topic. The broker's own
* topic-specific configuration settings will be used as defaults for any settings not set via the resulting object.
* </p>
* @param settings the initial settings; never null
* @return the topic-specific settings; may be null if the broker should auto-create the topic
  public TopicSettings settingsForNewTopic(TopicSettings settings) { ...
return null;

and adds a new new interface: 



* Topic-specific settings for a new topic.
public interface TopicSettings {
* The log cleanup policy for segments beyond the retention window
enum CleanupPolicy {
* Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.




* Discard old log data after a fixed period of time or when the log reaches some predetermined size.




* {@link #COMPACT Compact} to retain at least the last known value for each message key




         * {@link #DELETE delete} messages after a period of time.




* Get the name of the topic.
* @return the name of the topic
String name();
* Get the number of partitions.
* @return the number of partitions; always positive
int partitions();
* Get the replication factor.
* @return the replication factor; always positive
short replicationFactor();
* Get the cleanup policy.
* @return the cleanup policy; may be null if the broker's default setting for new topics is to be used
CleanupPolicy cleanupPolicy();
* Get the minimum number of in-sync replicas that must exist for the topic to remain available.
* @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used


Short minInSyncReplicas();
* Get whether the broker is allowed to elect an unclean leader for the topic.
* @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise;
* may be null if the broker's default setting for new topics is to be used
Boolean uncleanLeaderElection();
* Get the value for the named topic-specific configuration.
* @param name the name of the topic-specific configuration
* @return the configuration value, or null if the specified configuration has not been set
Object config(String name);
* Get the values for the set topic-specific configuration.
* @return the map of configuration values keyed by configuration name; never null
Map<String, Object> config();
* Specify the desired number of partitions for the topic.
* @param numPartitions the desired number of partitions; must be positive
* @return this settings object to allow methods to be chained; never null
TopicSettings partitions(int


* Specify the desired replication factor for the topic.
* @param replicationFactor the desired replication factor; must be positive
* @return this settings object to allow methods to be chained; never null
TopicSettings replicationFactor(short


* Specify the desired cleanup policy for the topic.
* @param policy the cleanup policy; may not be null
* @return this settings object to allow methods to be chained; never null
TopicSettings cleanupPolicy(CleanupPolicy policy);
* Specify the minimum number of in-sync replicas required for this topic.
* @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
* @return this settings object to allow methods to be chained; never null
TopicSettings minInSyncReplicas(


short minInSyncReplicas);
* Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
* are available.
* @param allow true if unclean leaders can be elected, or false if they are not allowed
* @return this settings object to allow methods to be chained; never null
TopicSettings uncleanLeaderElection(


boolean allow);
* Specify the configuration properties for the topic, overwriting any previously-set properties.
* @param configName the name of the topic-specific configuration property
* @param configValue the value for the topic-specific configuration property
* @return this settings object to allow methods to be chained; never null
TopicSettings config(String configName, Object configValue);
* Specify the configuration properties for the topic, overwriting all previously-set properties,
* including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}.
* @param configs the desired topic configuration properties, or null if all existing properties should be cleared
* @return this settings object to allow methods to be chained; never null
TopicSettings config(Map<String, Object> configs);




A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms config which represents a list of aliases. An alias in transforms implies that some additional keys are configurable:
transforms.$alias.type – fully qualified class name for the transformation
transforms.$alias.* – all other keys as defined in Transformation.config() are embedded with this prefix
