...
Two existing interfaces will be modified. This will allow for Tasks to get the latest versions of their configs with all indirect references reloaded (requires the planned upgrade of Kafka to Java 8).
Code Block | ||
---|---|---|
| ||
public interface SinkTaskContext { ... default Map<String, String> config() { ... } ... } public interface SourceTaskContext { ... default Map<String, String> config() { ... } ... } |
...
- Ability to specify one or more custom ConfigProviders that will resolve indirect references for configuration values. The ConfigProviders are plugins and use the same classloading mechanism as other plugins (converters, transformers, etc.).
- Ability to pass data to initialize a ConfigProvider on construction or instantiation.
- For indirect references, a special syntax using the dollar sign ($) will be used to indicate when a configuration value is an indirect reference and for which ConfigProvider(s).
...
In the above example, VaultConfigProvider will be passed the string "/run/secrets/vault-token" on initialization, which could be the filename for a Docker secret containing the initial Vault token, residing on the tmpfs mount, for instance. When resolving the value for "mysql.db.password", the VaultConfigProvider will use the key "vault_db_password_key". The VaultConfigProvider would use this key to look up the corresponding secret.\
Secret Rotation
Secret Management systems such as Vault support secret rotation by associating a "lease duration" with a secret, which can be read by the client.
...
- ConfigProvider: The
ConfigProvider
may have knowledge of the method of rotation. For Vault, it would be a "lease duration". For a file-based provider, it could be file watches. If it knows when a secret is going to be reloaded, it would callscheduleConfigReload()
to inform the Herder. - Herder: The herder can push information to the Connector indicating that secrets have expired or may expire in the future. When the Herder receives the
scheduleConfigReload()
call, it will check a new connector configuration propertyconfig.reload.action
which can be one of the following:- The value
restart
, which means to schedule a restart of the Connector and all its Tasks. This will be the default. - The value
none
, which means to do nothing.
- The value
- Connector Tasks: A task may wish to handle rotation on its own (a pull model). In this case the Connector would need to set
config.reload.action
tonone
ifscheduleConfigReload
is called. Also, The methodsSinkTaskContext.config()
andSourceTaskContext.config()
will be added to ask the framework would be used by the Task to reload the config and resolve indirect references again
ConfigProvider Example: Vault
Vault has three different items that can be rotated:
- Key rotation: Vault uses a keyring, so even if you rotate the keys, new data willl be encrypted by the new key, but old data will still be decryptable with the old keys. So rotating Vault keys does not have an impact on clients.
- Token rotation: A Vault token is used for authentication and may expire. In this case the VautConfigProvider will be aware of the lease duration of the token, and will need to request a new token before the old one expires. In this way token expiration is transparent to the Connector and is a concern of the ConfigProvider only.
- Secret rotation: A secret stored in Vault may be rotated, such as a JDBC or Elasticsearch password. In this case, the VaultConfigProvider will inform the Herder via a
scheduleConfigReload()
call. The Herder in turn will checkconfig.reload.action
to determine if either- Whether a restart of all Connector and all its Tasks should be scheduled (the default).
- Do nothing (in which case the Tasks themselves will have to deal with the rotation).
Connector Example: Elasticsearch
...
- .
...
Compatibility, Deprecation, and Migration Plan
...