...
This proposal adds a single non-abstract method to the existing org.apache.kafka.connect.source.SourceTask
class:
public abstract class SourceTask implements Task { ... /** * Determine the topic-specific settings for a new topic to which the {@link SourceRecord} {@link #poll() produced by this task} * are to be written. This method is called whenever a new topic is seen in the {@link SourceRecord}s, and sh * <p> * By default this method simply returns the supplied initial settings. Implementations can override this method * to set the topic-specific settings that should be used when creating the new topic. The broker's own * topic-specific configuration settings will be used as defaults for any settings not set via the resulting object. * </p> * * @param settings the initial settings; never null * @return the topic-specific settings; may be null if the broker should auto-create the topic */ public TopicSettings settingsForNewTopic(TopicSettings settings) { ... return null; } } |
and adds a new new org.apache.kafka.connect.storage.TopicSettings
interface:
...
/** * Topic-specific settings for a new topic. */ public interface TopicSettings { /** * The log cleanup policy for segments beyond the retention window */ enum CleanupPolicy { /** * Ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. */
|
...
...
/** * Discard old log data after a fixed period of time or when the log reaches some predetermined size. */
|
...
...
/** * {@link #COMPACT Compact} to retain at least the last known value for each message key |
...
...
* {@link #DELETE delete} messages after a period of time. */ |
...
COMPACT_AND_DELETE; } /** |
...
* Get the name of the topic. * @return the name of the topic */ String name();
/** * Get the number of partitions. * @return the number of partitions; always positive */ int partitions(); /** * Get the replication factor. * @return the replication factor; always positive */ short replicationFactor(); /** * Get the cleanup policy. * @return the cleanup policy; may be null if the broker's default setting for new topics is to be used */ CleanupPolicy cleanupPolicy(); /** * Get the minimum number of in-sync replicas that must exist for the topic to remain available. * @return the minimum number of in-sync replicas; may be null if the broker's default setting for new topics is to be used |
...
*/ Short minInSyncReplicas();
/** * Get whether the broker is allowed to elect an unclean leader for the topic. * @return true if unclean leader election is allowed (potentially leading to data loss), or false otherwise; * may be null if the broker's default setting for new topics is to be used */ Boolean uncleanLeaderElection(); /** * Get the value for the named topic-specific configuration. * @param name the name of the topic-specific configuration * @return the configuration value, or null if the specified configuration has not been set */ Object config(String name); /** * Get the values for the set topic-specific configuration. * @return the map of configuration values keyed by configuration name; never null */ Map<String, Object> config(); /** * Specify the desired number of partitions for the topic. * * @param numPartitions the desired number of partitions; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings partitions(int |
...
numPartitions); /** * Specify the desired replication factor for the topic. * * @param replicationFactor the desired replication factor; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings replicationFactor(short |
...
replicationFactor); /** * Specify the desired cleanup policy for the topic. * @param policy the cleanup policy; may not be null * @return this settings object to allow methods to be chained; never null */ TopicSettings cleanupPolicy(CleanupPolicy policy); /** * Specify the minimum number of in-sync replicas required for this topic. * * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive * @return this settings object to allow methods to be chained; never null */ TopicSettings minInSyncReplicas( |
...
short minInSyncReplicas); /** * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs * are available. * * @param allow true if unclean leaders can be elected, or false if they are not allowed * @return this settings object to allow methods to be chained; never null */ TopicSettings uncleanLeaderElection( |
...
boolean allow); /** * Specify the configuration properties for the topic, overwriting any previously-set properties. * * @param configName the name of the topic-specific configuration property * @param configValue the value for the topic-specific configuration property * @return this settings object to allow methods to be chained; never null */ TopicSettings config(String configName, Object configValue); /** * Specify the configuration properties for the topic, overwriting all previously-set properties, * including {@link #cleanupPolicy(CleanupPolicy)}, {@link #minInSyncReplicas(short)}, and {@link #uncleanLeaderElection(boolean)}. * * @param configs the desired topic configuration properties, or null if all existing properties should be cleared * @return this settings object to allow methods to be chained; never null */ TopicSettings config(Map<String, Object> configs); } |
Configuration
A transformation chain will be configured at the connector-level. The order of transformations is defined by the transforms
config which represents a list of aliases. An alias in transforms
implies that some additional keys are configurable:
- transforms.$alias.type
– fully qualified class name for the transformation
- transforms.$alias.*
– all other keys as defined in Transformation.config()
are embedded with this prefix
...