...
While this KIP focuses on Kafka Connect, we propose some common public interfaces and classes that could be used by other parts of Kafka, specifically:
ConfigProvider
,ConfigData
, andConfigContext
ConfigChangeCallback
: These interfaces could potentially be used by the Broker in conjunction with KIP-226 to overlay configuration properties from aConfigProvider
(such as aVaultConfigProvider
) onto existing configuration properties.ConfigTransformer
: This class could potentially be used by other clients (in conjunction with the previous interfaces) to implement variants of KIP-76 and KIP-269.
...
The public interfaces that are not Connect-specific consist of the following:
ConfigProvider
,ConfigData
, andConfigContext
ConfigChangeCallback
: These interfaces are used to abstract a provider of configuration properties.ConfigTransformer
: This class is used to provide variable substitution for a configuration value, by looking up variables (or indirect references) from a set ofConfigProvider
instances. It only provides one level of indirection.
Code Block | ||
---|---|---|
| ||
public interface ConfigProvider extends Configurable, Closeable { // Configure this class with the initialization parameters void configure(Map<String, ?> configs); // Lookup up the data at the given path. ConfigDataMap<String, String> get(ConfigContext ctx, String path); // Lookup up the data with the given keys at the given path. ConfigDataMap<String, String> get(ConfigContext ctx, String path, Set<String> keys); // The ConfigProvider is responsible for making this callback whenever the key changes. // Some ConfigProviders may want to have a background thread with a configurable update interval. void subscribe(String path, Set<String> keykeys, ConfigurationChangeCallbackConfigChangeCallback callback); // Inverse of subscribe void unsubscribe(String path, Set<String> key); // Close all subscriptions and clean up all resources void close(); } public interface ConfigurationChangeCallbackConfigChangeCallback { void onChange(String keypath, Map<String, String> values, Stringint valuedelayMs); } public interface ConfigContext { // The name of the client String name(); // Schedule a reload, possibly for secrets rotation void scheduleConfigReload(long delayMs); } public class ConfigData { private Map<String, String> metadata; private Map<String, String> data; public ConfigData(Map<String, String> metadata, Map<String, String> data) { this.metadata = metadata; this.data = data; } public Map<String, String> metadata() { return metadata; } public Map<String, String> data() { return data; } } |
Also a helper class will be added that can provide variable substitutions using ConfigProvider
instances. Here is an example implementation.
|
Also a helper class will be added that can provide variable substitutions using ConfigProvider
instances. Here is an example skeleton.
Code Block |
---|
/**
* This class wraps a set of {@link ConfigProvider} instances and uses them to perform
* transformations.
*/
public class ConfigTransformer {
private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}");
private final Map<String, ConfigProvider> configProviders;
private final Pattern pattern;
public ConfigTransformer(Map<String, ConfigProvider> configProviders) { |
Code Block |
/** * This class wraps a set of {@link ConfigProvider} instances and uses them to perform * transformations. */ public class ConfigTransformer { private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class); private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):(.*?)\\}"); private final Map<String, ConfigProvider> configProviders; private final Pattern pattern; public ConfigTransformer(Map<String, ConfigProvider> configProviders) { this(configProviders, DEFAULT_PATTERN); } public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) { this.configProviders = configProviders; this.pattern = pattern; } public Map<String, String> transform(ConfigContext ctx, Map<String, String> configs) { Map<String, Set<String>> keysByProvider = new HashMap<>(); Map<String, Map<String, String>> lookupsByProvider = new HashMap<>(); // Collect the variables that need transformation for (Map.Entry<String, String> config : configs.entrySet()) { List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), pattern); for (ConfigVariable var : vars) { Set<String> keys = keysByProvider.get(var.providerName); if (keys == null) { keys = new HashSet<>(); keysByProvider.put(var.providerName, keys); } keys.add(var.valueVariable); } } // Lookup requested variables from the ConfigProviders for (Map.Entry<String, Set<String>> entry : keysByProvider.entrySet()) { ConfigProvider provider = configProviders.get(entry.getKey()); ConfigData configData = provider.lookup(ctx, null, new HashSet<>(entry.getValue())); Map<String, String> data = configData.data(); lookupsByProvider.put(entry.getKey(), data); } // Perform the transformations Map<String, String> result = new HashMap<>(configs); for (Map.Entry<String, String> config : configs.entrySet()) { result.put(config.getKey(), replace(lookupsByProvider, config.getValue(), pattern)); } return result; } private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) { List<ConfigVariable> configVars = new ArrayList<>(); Matcher matcher = pattern.matcher(value); while (matcher.find()) { configVars.add(new ConfigVariable(matcher.group(1), matcher.group(2))); } return configVars; } private static String replace(Map<String, Map<String, String>> lookupsByProvider, String value, Pattern pattern) { Matcher matcher = pattern.matcher(value); StringBuilder builder = new StringBuilder(); int i = 0; while (matcher.find()) { Map<String, String> map = lookupsByProvider.get(matcher.group(1)); String replacement = map.get(matcher.group(2)); builder.append(value, i, matcher.start()); if (replacement == null) builder.append(matcher.group(0)); else builder.append(replacement); i = matcher.end(); } builder.append(value, i, value.length()); return builder.toString(); } private static class ConfigVariable { final String providerName; final String valueVariablethis(configProviders, DEFAULT_PATTERN); } ConfigVariable(String providerName, String valueVariable) { public ConfigTransformer(Map<String, ConfigProvider> configProviders, Pattern pattern) { this.providerNameconfigProviders = providerNameconfigProviders; this.valueVariablepattern = valueVariablepattern; } public String toString() { return "(" + providerName + ":" + valueVariable + ")";Map<String, String> transform(Map<String, String> configs) { }... } } |
Two existing interfaces that are specific to Connect will be modified. This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).
...
The patterns for variable substitutions are of the form ${provider:[path:]key
}, where only one level of indirection is followed during substitutions. This The path
in the variable is optional. This means if you have the following:
...
Code Block | ||
---|---|---|
| ||
# Properties specified in the Worker config
config.providers=vault # can have multiple comma-separated values
config.provider.vault.class=com.org.apache.connect.configs.VaultConfigProvider
config.provider.vault.param.uri=1.2.3.4
config.provider.vault.param.token=/run/secrets/vault-token
# Properties specified in the Connector config
mysql.db.password=${vault:vault_path:vault_db_password_key} |
In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance. When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key". The VaultConfigProvider would use this key to look up the corresponding secret. (VaultConfigProvider is a hypothetical example for illustration purposes only.)
...
Code Block |
---|
//** * An implementation of {@link ConfigProvider} that simply uses a Properties file. */ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); public final static String FILE_NAME = "filename"; private Properties properties; /** * Configure this class with the initialization parameters */ public void configure(Map<String, ?> configs) { String fileName = (String) configs.get(FILE_NAME); try (FileReader fileReader = new FileReader(fileName)) { properties = new Properties(); properties.load(fileReader); } catch (IOException e) { throw new ConfigException("File name " + fileName + " not found for FileConfigProvider"); } } /** * Lookup up the data at the given path. */ public ConfigDataMap<String, String> get(ConfigContext ctx, String path) { Map<String, String> data = new HashMap<>(); Enumeration<Object> keys = properties.keys(); while (keys.hasMoreElements()) { String key = keys.nextElement().toString(); String value = properties.getProperty(key); if (value != null) { data.put(key, value); } } return new ConfigData(Collections.<String, String>emptyMap(),return new data); } /** * Lookup up the data with the given keys at the given path. */ public ConfigDataMap<String, String> get(ConfigContext ctx, String path, Set<String> keys) { Map<String, String> data = new HashMap<>(); for (String key : keys) { String value = properties.getProperty(key); if (value != null) { data.put(key, value); } } return new ConfigData(Collections.<String, String>emptyMap(), data.put(key, value); } // The ConfigProvider is responsible} for making this callback whenever the key changes.} // Some ConfigProviders may wantreturn todata; have a background thread} with a configurable update interval. public void subscribe(String key, ConfigurationChangeCallbackConfigChangeCallback callback) { throw new UnsupportedOperationException(); } // Inverse of subscribe public void unsubscribe(String key) { throw new UnsupportedOperationException(); } public void close() { } } |
...
- ConfigProvider: The
ConfigProvider
may have knowledge of the method of rotation. For Vault, it would be a "lease duration". For a file-based provider, it could be file watches. If it knows when a secret is going to be reloaded, it would callscheduleConfigReloadonChange()
to inform the Herder. - Herder: The Herder can push information to the Connector indicating that secrets have expired or may expire in the future. When the Herder receives the
scheduleConfigReloadonChange()
call, it will check a new connector configuration propertyconfig.reload.action
which can be one of the following:- The value
restart
, which means to schedule a restart of the Connector and all its Tasks. This will be the default. - The value
none
, which means to do nothing.
- The value
- Connector Tasks: A task may wish to handle rotation on its own (a pull model). In this case the Connector would need to set
config.reload.action
tonone
. The methodsSinkTaskContext.config()
andSourceTaskContext.config()
would be used by the Task to reload the config and resolve indirect references again.
...