Status

Current state: PR Review done/ KIP review is in progress

Motivation

Client applications use SSL/TLS to connect with Kafka brokers to implement secured communication. The clients initiate SSL communication with Kafka brokers using the SSL Engine constructed from the ssl.* properties pointing to key store and trust store. Most of the time, the key store on the client side contains a single key. But when the key store contains multiple keys, in order to avoid SSL handshake issues or authorization issues communicating with Kafka brokers, it is required to choose the right key from the key store. This proposal addresses important enhancements related to loading the key store for secured communication with Kafka brokers.

Proposed Changes

  • Add the below constants for configs in the SslConfig.java


SslConfigs.java
public static final String SSL_KEYSTORE_ALIAS_CONFIG = "ssl.keystore.alias";
public static final String SSL_KEYSTORE_ALIAS_DOC = "This config is used to pick named alias from the keystore to build the SSL engine and authenticate the client with broker. " +
"This is an optional config and used only when you have multiple keys in the keystore and you need to control which key needs to be presented to server.";


  • Add a new method applyAliasToKM to DefaultSslEngineFactory.java.  Please find the below method details.

This method customizes the loading of KeyManager for a given client key alias.

When no alias is specified

In this scenario, the first entry in the keystore will be used for the SSL handshake

When an alias is not found in the key store

In this scenario, the SSL handshake exception will be thrown.

applyAliasToKM
private KeyManager[] applyAliasToKM(KeyManager[] kms, final String alias) {
        if(alias == null || alias.isEmpty()){
            return kms;
        }

        log.info("Applying the custom KeyManagers for alias: {}", alias);

        KeyManager[] updatedKMs = new KeyManager[kms.length];

        int i=0;
        for(KeyManager km : kms){
            final X509KeyManager origKM = (X509KeyManager)km;
            X509ExtendedKeyManager exKM = new X509ExtendedKeyManager() {
                /* (non-Javadoc)
                 * @see javax.net.ssl.X509ExtendedKeyManager#chooseEngineClientAlias(java.lang.String[], java.security.Principal[], javax.net.ssl.SSLEngine)
                 */
                @Override
                public String chooseEngineClientAlias(String[] arg0,
                                                      Principal[] arg1, SSLEngine arg2) {
                    return alias;
                }

                @Override
                public String[] getServerAliases(String arg0, Principal[] arg1) {
                    return origKM.getServerAliases(arg0, arg1);
                }

                @Override
                public PrivateKey getPrivateKey(String arg0) {
                    return origKM.getPrivateKey(arg0);
                }

                @Override
                public String[] getClientAliases(String arg0, Principal[] arg1) {
                    return origKM.getClientAliases(arg0, arg1);
                }

                @Override
                public X509Certificate[] getCertificateChain(String arg0) {
                    return origKM.getCertificateChain(arg0);
                }

                @Override
                public String chooseServerAlias(String arg0, Principal[] arg1, Socket arg2) {
                    return origKM.chooseServerAlias(arg0, arg1, arg2);
                }

                @Override
                public String chooseClientAlias(String[] arg0, Principal[] arg1, Socket arg2) {
                    return alias;
                }
            };
            updatedKMs[i++] = exKM;
        }
        return updatedKMs;
    }


  • Initialize the SSL Context with the custom key manager that was loaded previously.


sslContext.init(applyAliasToKM(keyManagers, (String)configs.get(SslConfigs.SSL_KEYSTORE_ALIAS_CONFIG)), trustManagers, this.secureRandomImplementation);


  • Update private SSLContext createSSLContext(SecurityStore keystore, SecurityStore truststore, Map<String, ?> configs) method to pass additional config params in  DefaultSslEngineFactory.java
  • Pass the addition configs to context by this.sslContext = createSSLContext(keystore, truststore, configs);


Public Interfaces

The only change to the clients is to provide a new property ssl.keystore.alias that points to the key alias within the key store.

Example configuration
ssl.keystore.alias=<alias.name>

PR Details: https://github.com/apache/kafka/pull/17560

Compatibility, Deprecation, and Migration Plan

  • The proposed changes are fully backward compatible. 

Test Plan

  • This is verified by Junit and integration tests
  • Junits added for new methods


  • No labels