Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-18573 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka added basic support for machine-to-machine (M2M) OAuth 2.0 flows on the client in KIP-768. Though the OAuth 2.0 family of specifications support several grant types applicable in M2M applications, as of Apache Kafka 4.0, only the client_credentials grant type is supported. Some organizations are wary of using the current client_credentials grant type implementation since it requires a secret to be specified in plain text in the configuration. For that reason, not all cloud providers support the client_credentials grant type.

Primary Motivator: jwt-bearer Grant Type Support

The focus of this KIP is to introduce support for the OAuth 2.0 urn:ietf:params:oauth:grant-type:jwt-bearer grant type as defined in RFC 7523. RFC 7523 details how a client can authenticate using an assertion (a cryptographically signed JWT defined in RFC 7521) instead of a plain text secret when requesting an OAuth 2.0 token. This reduces potential security compromises as the static secret is not part of the deployment configuration directly and the information used to create the assertion can be rotated as dictated by the organization's policy.

Shorthand

In this document the colloquialism "JWT Bearer" or "jwt-bearer" are used as shorthand for urn:ietf:params:oauth:grant-type:jwt-bearer.



Here is the basic flow when using the jwt-bearer grant type with private key information:

  1. The Apache Kafka client first reads any configuration and/or data used to create the assertion
  2. A temporary JWT is created on the client that contains claims (e.g. iss, sub, aud, iat, exp, as well as arbitrary custom claims (RFC 7523, section 3))
  3. The assertion is created by the client ("self-signed") using a configured private key (typically RSA256) to encrypt the JWT (RFC 7515, section 3), thus allowing the client to authenticate without secrets
  4. The assertion is exchanged with the identity provider using the HTTP parameters assertion and grant_type with a value of urn:ietf:params:oauth:grant-type:jwt-bearer (RFC 7523, section 2.1)
  5. If the assertion is valid, a JWT is returned that contains claims that are germane to the Apache Kafka broker
  6. As part of the connection handshake process, the JWT is then sent from the client to the broker
  7. Brokers will fetch and cache key sets (JWKS) from the identity provider that contains the information necessary to validate the JWT from the client
  8. The identity provider will return the JWKS for use by the broker
  9. Using the JWKS and the claims in the JWT received by the client, the broker will validate that the JWT contains expected data
  10. The broker will continue its process for initializing the connection and eventually return success (or failure) to the client

This OAuth flow allows clients to authenticate using an assertion instead of relying on client credentials or refresh tokens. Because this mechanism does not require long-lived client secrets, the Kafka ecosystem would benefit from the improved security and simplified credential management. Additionally, JWTs allow embedding structured claims, making them useful for role-based access control (RBAC) and multi-tenant deployments. As it is an industry standard, the JWT Bearer grant type is supported by many identity identity providers; in some cases, the JWT Bearer grant type may be the only grant type a given identity provider supports.

Secondary Motivator: Extensibility

A secondary motivation for this KIP is to refactor the existing code for future OAuth 2.0 work. Much of the existing implementation of the OAuth support is currently hidden inside internal packages that are not meant for outside usage. This has burdened users that want to extend Kafka's OAuth support to enable specific use cases; it forces them to either reinvent much of the logic or to tie their code to the internal APIs anyway. It has been noted that other downstream projects are coding against these internal APIs anyway, so exposing some basic building blocks could be useful to avoid toil. So in addition to supporting the jwt-bearer grant type, many of the changes in this KIP are designed to allow future extension of the OAuth support with less encumbrance.

Reuse of Existing OAuth Primitives

The addition of support for the JWT Bearer grant type flow and associated refactoring builds upon the other services and primitives in the existing Kafka OAuth layer. The following are still applicable for the new JWT Bearer support:

  • Cluster operators can continue to use the connections.max.reauth.ms configuration to trigger token refresh on the client
    • Client token refresh will use the JWT Bearer logic instead of the Client Credentials flow
  • Tokens acquired via the JWT Bearer flow still use the OAuth JWT validation mechanisms in the broker

The JWT Bearer work is additive, not a replacement.

Public Interfaces

Classes

All classes, interfaces, and exceptions reside in the org.apache.kafka.common.security.oauthbearer package.

JwtRetriever

JwtRetriever
/**
 * An implementation of <code>JwtRetriever</code> is the means by which the login module will
 * retrieve an OAuth JWT that is used to authorize with a broker. The implementation may
 * involve communicating with remote systems, local files, or reading contents of a
 * configuration setting.
 *
 * <p/>
 *
 * <i>Retrieval</i> of a token is a separate concern from <i>validation</i>.
 * <code>JwtRetriever</code> implementations should not validate the integrity of the
 * token, but should rely on the companion {@link JwtValidator} for that task.
 * 
 * <p/>
 *
 * Because a <code>JwtRetriever</code> is instantiated from its class name, the
 * implementation must include a no-argument constructor. This interface extends from
 * {@link OAuthBearerConfigurable}, which is analogous to {@link Configurable} in that
 * implementations are expected to wait until that method is called to create any
 * resources. Any such resources should be closed in {@link #close()}.
 */
public interface JwtRetriever extends OAuthBearerConfigurable {

    /**
     * Retrieves a JWT in its serialized three-part form. The implementation is free to
     * determine how it should be retrieved but should not perform validation on the result.
     *
     * <p/>
     *
     * <b>Note</b>: This is a blocking function and callers should be aware that the
     * implementation may be communicating over a network, with the file system, coordinating
     * threads, etc. The facility in the {@link LoginModule} from which this is ultimately
     * called does not provide an asynchronous approach.
     *
     * @return Non-<code>null</code> JWT string
     *
     * @throws JwtRetrieverException For errors related to I/O, parsing, etc. during retrieval
     */
    String retrieve() throws JwtRetrieverException;

    /**
     * Closes any resources used by this implementation. The default implementation of
     * this method is a no op, for convenience to implementors.
     */
    @Override
    default void close() throws IOException {
        // Do nothing...
    }
}

The following are stock implementations of JwtRetriever that can be used by configuring sasl.oauthbearer.jwt.retriever.class. The following classes do not expose any public methods other than those in JwtRetriever.

ClientCredentialsJwtRetriever

This implementation of JwtRetriever communicates with an identity provider directly via HTTP. It posts client credentials that are derived from the sasl.oauthbearer.client.credentials.client.id and sasl.oauthbearer.client.credentials.client.secret configuration by encoding them in the standard HTTP Authorization header. The optional sasl.oauthbearer.scope configuration value is included in the OAuth scope HTTP parameter, if set. The URL to which this information is sent is set in the sasl.oauthbearer.token.endpoint.url configuration.

Existing internal class

The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.retriever.class.

DefaultJwtRetriever

Per its name, this class serves as the default implementation of JwtRetriever that will be used if sasl.oauthbearer.jwt.retriever.class is not configured. It provides backward compatibility with previous versions of Kafka that did not let the user choose the JwtRetriever. At runtime, depending on the configuration, this class will create and delegate to an instance of one of these implementations:

  • ClientCredentialsJwtRetriever
  • FileJwtRetriever
  • JwtBearerJwtRetriever

The following flowchart shows the logic to select the correct delegate:


FileJwtRetriever

This JwtRetriever implementation interprets the sasl.oauthbearer.token.endpoint.url configuration as a local filesystem path, e.g. file:/super/secure/jwt.txt. At runtime, the contents of the file are read and assumed to be in a serialized JWT.

Existing internal class

The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.retriever.class.

JwtBearerJwtRetriever

This implementation of JwtRetriever communicates with an identity provider directly via HTTP. It posts a client-signed assertion using the configuration in the assertion HTTP parameter. The optional sasl.oauthbearer.scope configuration value is included in the OAuth scope HTTP parameter, if set. The URL to which this information is sent is set in the sasl.oauthbearer.token.endpoint.url configuration.

JwtValidator

JwtValidator
/**
 * An instance of <code>JwtValidator</code> acts as a function object that, given a String in
 * base 64-encoded JWT format, can parse the data, perform validation, and construct an
 * {@link OAuthBearerToken} for use by the caller.
 *
 * <p/>
 *
 * The primary reason for this abstraction is that client and broker may have different libraries
 * available to them to perform these operations. Additionally, the exact steps for validation may
 * differ between implementations. To put this more concretely: the implementation in the Kafka
 * client does not have bundled a robust library to perform this logic, and it is not the
 * responsibility of the client to perform vigorous validation. However, the Kafka broker ships with
 * a richer set of library dependencies that can perform more substantial validation and is also
 * expected to perform a trust-but-verify test of the token's signature.
 *
 * See:
 *
 * <ul>
 *     <li><a href="https://datatracker.ietf.org/doc/html/rfc6749#section-1.4">RFC 6749, Section 1.4</a></li>
 *     <li><a href="https://datatracker.ietf.org/doc/html/rfc6750#section-2.1">RFC 6750, Section 2.1</a></li>
 *     <li><a href="https://datatracker.ietf.org/doc/html/draft-ietf-oauth-access-token-jwt">RFC 6750, Section 2.1</a></li>
 * </ul>
 *
 * @see DefaultJwtValidator   Default validator that acts as a wrapper over one of the other validators
 * @see ClientJwtValidator    A basic JwtValidator used by client-side login authentication
 * @see BrokerJwtValidator    A more robust JwtValidator that is used on the broker
 *                            to validate the token's contents and verify the signature
 */
public interface JwtValidator extends OAuthBearerConfigurable {

    /**
     * Accepts an OAuth JWT in base 64-encoded format, validates, and returns an
     * OAuthBearerToken.
     *
     * @param token Non-<code>null</code> String in JWT format 
     *
     * @return {@link OAuthBearerToken}
     *
     * @throws JwtValidatorException Thrown on errors performing validation of given token
     */
    OAuthBearerToken validate(String token) throws JwtValidatorException;

    /**
     * Closes any resources used by this implementation. The default implementation of
     * this method is a no op, for convenience to implementors.
     */
    @Override
    default void close() throws IOException {
        // Do nothing...
    }
}

The following are stock implementations of JwtValidator that can be used by configuring sasl.oauthbearer.jwt.validator.class appropriately. The following classes do not expose any public methods other than those in JwtValidator.

BrokerJwtValidator

This implementation of JwtValidator is used by the broker to perform more extensive validation of the JWT that is received from the client, but ultimately from posting the client credentials to the identity provider.

The validation steps performed (primary by the jose4j library) are:

  1. Basic structural validation of the b64token value as defined in RFC 6750 Section 2.1
  2. Basic conversion of the token into an in-memory claims map
  3. Presence of scope, exp, sub, iss, and iat claims
  4. Signature matching validation against the kid and those provided by the OAuth/OIDC provider's JWKS


Existing internal class

The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.

ClientJwtValidator

This implementation of JwtValidator is used by the client to perform some rudimentary validation of the JWT that is received as part of the response from posting the client credentials to the identity provider.

The validation steps performed are:

  1. Basic structural validation of the b64token value as defined in RFC 6750 Section 2.1
  2. Basic conversion of the token into an in-memory claims map
  3. Presence of scope, exp, sub, and iat claims


Existing internal class

The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.

DefaultJwtValidator

This class is the default implementation of JwtValidator that is used if sasl.oauthbearer.jwt.validator.class is not configured. It provides backward compatibility with previous versions of Kafka that did not let the user choose the JwtValidator. At runtime, depending on the configuration, this class will create and delegate to an instance of one of these implementations:

  • BrokerJwtValidator
  • ClientJwtValidator

The following flowchart shows the logic to select the correct delegate:



Existing internal class

The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.

JwtRetrieverException

InvalidJwtException
/**
 * A {@code JwtRetrieverException} is thrown in cases where the JWT cannot be retrieved.
 *
 * @see JwtRetriever#retrieve()
 */
public class JwtRetrieverException extends KafkaException {

    public JwtRetrieverException(String message) {
        super(message);
    }

    public JwtRetrieverException(Throwable cause) {
        super(cause);
    }

    public JwtRetrieverException(String message, Throwable cause) {
        super(message, cause);
    }
}

JwtValidatorException

InvalidJwtException
/**
 * A {@code JwtValidatorException} is thrown in cases where the validity of a JWT cannot be
 * determined. It is intended to be used when errors arise within the processing of a
 * {@link CallbackHandler#handle(Callback[])}. This error, however, is not thrown from that
 * method directly.
 *
 * @see JwtValidator#validate(String)
 */
public class JwtValidatorException extends KafkaException {

    public JwtValidatorException(String message) {
        super(message);
    }

    public JwtValidatorException(Throwable cause) {
        super(cause);
    }

    public JwtValidatorException(String message, Throwable cause) {
        super(message, cause);
    }
}

OAuthBearerConfigurable

OAuthBearerConfigurable
/**
 * Analogue to {@link Configurable} for OAuth-based authentication. This interface presents a similar
 * method signature as that of the {@link AuthenticateCallbackHandler} interface. However, this interface is
 * needed because {@link AuthenticateCallbackHandler} extends the JDK's {@link CallbackHandler} interface.
 *
 * <p/>
 *
 * <em>Note</em>:
 *
 * <ol>
 *   <li>
 *     Any class that <em>implements</em> this interface should initialize resources via
 *     {@link #configure(Map, String, List)} and release them via {@link #close()}.
 *   </li>
 *   <li>
 *     Any class that <em>instantiates</em> an object that implements {@code OAuthBearerConfigurable}
 *     must properly call that object's ({@link #configure(Map, List)} and {@link #close()}) methods
 *     so that the object can initialize and release resources. 
 *   </li>
 * </ol>
 */
public interface OAuthBearerConfigurable extends Closeable {

    /**
     * Configures this object for the specified SASL mechanism.
     *
     * @param configs Key-value pairs containing the parsed configuration options of
     *        the client or broker. Note that these are the Kafka configuration options
     *        and not the JAAS configuration options. JAAS config options may be obtained
     *        from `jaasConfigEntries`. For configs that may be specified as both Kafka config
     *        as well as JAAS config (e.g. sasl.kerberos.service.name), the configuration
     *        is treated as invalid if conflicting values are provided.
     * @param jaasConfigEntries JAAS configuration entries from the JAAS login context.
     *        This list contains a single entry for clients and may contain more than
     *        one entry for brokers if multiple mechanisms are enabled on a listener using
     *        static JAAS configuration where there is no mapping between mechanisms and
     *        login module entries. In this case, implementations can use the login module in
     *        `jaasConfigEntries` to identify the entry corresponding to `saslMechanism`.
     *        Alternatively, dynamic JAAS configuration option
     *        {@link org.apache.kafka.common.config.SaslConfigs#SASL_JAAS_CONFIG} may be
     *        configured on brokers with listener and mechanism prefix, in which case
     *        only the configuration entry corresponding to `saslMechanism` will be provided
     *        in `jaasConfigEntries`.
     */
    void configure(Map<String, ?> configs, List<AppConfigurationEntry> jaasConfigEntries);
}

Configuration

Broker Naming Convention

When used for inter-broker communication, the configuration is name prefixed with the listener name:

listener.name.<listener name>.oauthbearer.

So if a broker has a listener named external...

sasl.oauthbearer.token.endpoint.url

...would become...

listener.name.external.oauthbearer.sasl.oauthbearer.token.endpoint.url

Configuration Listing

The following table lists the configuration that is being introduced in this KIP.

ConfigurationTypeRequiredDefaultDescription
sasl.oauthbearer.assertion.algorithmStringNo

RS256

This is the algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth alg (Algorithm) header in the JWT assertion.

Valid options are:

  • RS256
  • ES256


Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.claim.audStringNo

The value to be used as the aud (Audience) claim which will be included in the client assertion created locally.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.claim.exp.secondsIntNo300

The number of seconds in the future that the JWT is valid. Used to determine the exp (Expiration) claim based on the current system time when the JWT is created. The formula is very simple:

let x = the current timestamp in seconds
let y = the value of this configuration

then

exp = $x + $y

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.claim.issStringNo

The value to be used as the iss (Issuer) claim which will be included in the client assertion created locally.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.template.file is used, this configuration will overwrite any values in the template file.

sasl.oauthbearer.assertion.claim.jti.include booleanNofalse

This flag determines if the assertion should add a unique ID for the jti claim.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.claim.nbf.secondsIntNo60

The number of seconds in the past that the JWT is valid. Used to determine the nbf claim based on the current system time when the JWT is created. The formula is very simple:

let x = the current timestamp in seconds
let y = the value of this configuration

then

nbf = $x - $y

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.claim.subStringNo

The value to be used as the sub (Subject) claim which will be included in the client assertion created locally.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.fileStringNo

Optional file name the user can specify a file in which a pre-generated assertion exists as a local file on disk.

The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” assertion rotation without restarting.

The file is the assertion in the serialized JWT format:

header.payload.signature
  • The header is a base 64-encoded JWT header that contains values like alg (Algorithm), typ (Type, always the literal value JWT), etc.
  • The payload includes the base 64-encoded set of JWT claims, such as aud (Audience), iss (Issuer), sub (Subject), etc.
  • The signature is the signed header.payload portion that is signed using a private key

See RFC 7519 and RFC 7515 for more details on the JWT and JWS formats.

Mutually exclusive configuration

If sasl.oauthbearer.assertion.file is used, all other sasl.oauthbearer.assertion.* configuration are ignored.

sasl.oauthbearer.assertion.private.key.fileStringNo

File name to a private key in the standard PEM format which is used to sign the assertion sent to the identity provider.

The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” private key rotation without restarting.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.private.key.passphrasePasswordNo


The optional passphrase to decrypt the private key file.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.assertion.template.fileStringNo

This optional configuration specifies the file path containing the JWT headers and/or payload claims to be included in the assertion. Not all identity providers require the same set of claims; some may require a given claim while others may prohibit it. In order to provide the most flexibility, this allows the user to provide the static header values and claims that are to be included in the JWT.

Here's an example template file that contains static values.

{
   "header": {
    "kid": "f829d41b06f14f9e",
    "some-random-header": 123456
  },
  "payload": {
    "sub": "some-service-account",
    "aud": "my_audience",
    "iss": "https://example.com",
    "useSomeResource": false,
    "allowedAnimals": [
      "cat",
      "dog",
      "hamster"
    ]
  }
}

The assertion creation step would then augment the header and/or payload with dynamic values.

For example, the above header would be augmented with the alg (algorithm) and typ (type) values per the OAuth RFC:

{
  "kid": "f829d41b06f14f9e",
  "some-random-header": 123456,
  "alg": "RS256",
  "typ": "JWT"
}

And the payload would also be augmented to add the iat (Issued At) and exp (Expiration) timestamps:

{
  "iat": 1741121401,
  "exp": 1741125001,
  "sub": "some-service-account",
  "aud": "my_audience",
  "iss": "https://example.com",
  "useSomeResource": false,
  "allowedAnimals": [
    "cat",
    "dog",
    "hamster"
  ]
}

The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” rotation of the static JWT contents without restarting.

Please see RFC 7519 and RFC 7515 for more details on the JWT and JWS format.

Mutually exclusive configuration

If the sasl.oauthbearer.assertion.file is used, this configuration will be ignored.

sasl.oauthbearer.client.credentials.client.idStringNo

The ID (defined in/by the identity provider) to determine the resource requesting the token.

Replaces JAAS option

The client ID was previously stored as part of the sasl.jaas.config configuration with the key clientId. For backward compatibility, the clientId JAAS option can still be used, but a warning message will be logged to alert users of its deprecated state. The JAAS option is tentatively slated for removal in Apache Kafka 5.0.

Order of precedence:

  • sasl.oauthbearer.client.credentials.client.id from configuration
  • clientId from JAAS
sasl.oauthbearer.client.credentials.client.secretPasswordNo

The secret (defined by either the user or preassigned, depending on the identity provider) to determine the resource requesting the token.

Replaces JAAS option

The client secret was previously stored as part of the sasl.jaas.config configuration with the key clientSecret. For backward compatibility, the clientSecret JAAS option can still be used, but a warning message will be logged to alert users of its deprecated state. The JAAS option is tentatively slated for removal in Apache Kafka 5.0.

Order of precedence:

  • sasl.oauthbearer.client.credentials.client.secret from configuration
  • clientSecret from JAAS
sasl.oauthbearer.grant.typeStringNoclient_credentials

This is the OAuth grant type to use when communicating with the identity provider. On the whole, the OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the JwtRetriever implementation.

The default value of client_credentials is used to maintain backward compatibility.

The built-in options are:

  • client_credentials
  • urn:ietf:params:oauth:grant-type:jwt-bearer

The OAuth code in Kafka does not limit the values that are used. A user can write a custom JwtRetriever implementation that uses a completely different grant type, if desired.

sasl.oauthbearer.jwt.retriever.classClassNoorg.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever

The fully-qualified class name of a JwtRetriever implementation used to request tokens from the identity provider.

The default value represents a class that maintains backward compatibility with previous versions of Apache Kafka. DefaultJwtRetriever uses the configuration to determine which concrete implementation to create.

Other concrete implementations that this KIP introduces are:

  • org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever
  • org.apache.kafka.common.security.oauthbearer.FileJwtRetriever
  • org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
sasl.oauthbearer.jwt.validator.classClassNoorg.apache.kafka.common.security.oauthbearer.DefaultJwtValidator

The fully-qualified class name of a JwtValidator implementation used to perform validation of the token received from the identity provider.

The default value represents a class that maintains backward compatibility with previous versions of Apache Kafka. DefaultJwtValidator uses the presence of other configuration to determine if it's running as a client or as a broker.

Other concrete implementations that this KIP introduces are:

  • org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator
  • org.apache.kafka.common.security.oauthbearer.ClientJwtValidator
sasl.oauthbearer.scopeStringNo

This is the level of access a client application is granted to a resource or API which is included in the token request. If provided, it should match one or more scopes configured in the identity provider.

Replaces JAAS option

The scope was previously stored as part of the sasl.jaas.config configuration with the key scope. For backward compatibility, the scope JAAS option can still be used, but a warning message will be logged to alert users of its deprecated state. The JAAS option is tentatively slated for removal in Apache Kafka 5.0.

Order of precedence:

  • sasl.oauthbearer.scope from configuration
  • scope from JAAS

Restricting File Access

The Kafka project has encountered CVEs related to unsafe access to external resources, so care should be taken to ensure that only allowable resources are read. The OAuth layer already has a system property ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG that is an allow list for URLs to access. When OAuth-based clients and brokers access token URLs (sasl.oauthbearer.token.endpoint.url) or JWKS URLs (sasl.oauthbearer.jwks.endpoint.url), the configured values are checked against the the allow list in ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG before proceeding. This KIP thus introduces a similar mechanism for file access with a new system property ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG. This configuration is similarly a comma-separated list of file paths that the current process is allowed to access. The following OAuth configuration will be checked against this new system property:

  • sasl.oauthbearer.assertion.file
  • sasl.oauthbearer.assertion.private.key.file
  • sasl.oauthbearer.assertion.template.file

If the configured path for any of the above do not appear in ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, a ConfigException will be thrown.

Example

Here is an example of configuration for a JWT Bearer enabled Kafka login module (assumes the JVM was started with something like -DALLOWED_SASL_OAUTHBEARER_FILES_CONFIG=/path/to/template.json,/path/to/private.key):

Example configuration
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
sasl.oauthbearer.grant.type=urn:ietf:params:oauth:grant-type:jwt-bearer
sasl.oauthbearer.assertion.private.key.file=/path/to/private.key
sasl.oauthbearer.assertion.algorithm=RS256
sasl.oauthbearer.assertion.claim.exp.seconds=600
sasl.oauthbearer.assertion.template.file=/path/to/template.json
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  ssl.protocol="SSL" ;

Proposed Changes

OAuthCompatibility Tool

As part of the work for KIP-768, a standalone tool in the tools directory/module named org.apache.kafka.tools.OAuthCompatibilityTool was created, and this KIP will extend that tool.

The tool's use of command line options result in some issues:

  1. The tool uses a separate command line option for each configuration, which is a fixed set
  2. It potentially exposes sensitive information on the command line (e.g. --client-secret=$3cr3+)
  3. The configuration from the command line options are used for the client and the broker

This KIP will introduce two new command line options that can be used to work around these limitations:

  1. --client-config: the name of the file containing the client's configuration 
  2. --broker-config: the name of the file containing the broker's configuration

If the user specifies any explicit configuration via the command line, the tool will use the explicit option instead of the one from the configuration file, with a warning message emitted to stderr to inform users of the fact

Take this example where both configuration style arguments are used:

./bin/kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
--client-id foo \
--client-secret bar \
--client-config /tmp/client.properties \
--broker-config /tmp/broker.properties

Here, all the configuration for both the client and broker will be used except the value of sasl.jaas.config for the client. In this case, the value of sasl.jaas.config from the client configuration file will be ignored and replaced with a new value generated from --client-id and --client-secret.

File Reloading

The JWT Bearer implementation relies on files to generate its assertion. The implementation should automatically detect when changes occur on the underlying filesystem and reload and cache file contents. This is mostly to allow secret rotation for sasl.oauthbearer.assertion.private.key.file and sasl.oauthbearer.assertion.file, but it's also useful for sasl.oauthbearer.assertion.template.file. The exact mechanism to achieve this is specific for each Kafka client, but users should expect the files to be reloaded "reasonably" quickly. A lag of a few seconds between the file system change and when the application reloads the file is acceptable.

Compatibility, Deprecation, and Migration Plan

The addition of JWT Bearer support was designed to not affect existing users:

  1. The configuration for the client credentials flow remains the same
  2. Three JAAS options (clientIdclientSecret, and scope) for the existing client_credentials grant type are deprecated in favor of the sasl.oauthbearer.client.credentials.client.id, sasl.oauthbearer.client.credentials.client.secret, and sasl.oauthbearer.scope configuration, respectively. In the case where the user does not migrate to the configuration-based approach and remains using JAAS, behavior remains the same as before this KIP. In the case where both a JAAS value and its respective configuration are provided, the configuration will take precedence.

Test Plan

As part of the work for KIP-768, a standalone tool in the tools directory/module named org.apache.kafka.tools.OAuthCompatibilityTool was created, and this KIP will build on that tool.

There is currently a testing gap for integration and system testing for the OAuth code. In order to test OAuth code, we need an OAuth-compatible service that meets the following requirements:

  1. Open source using an Apache-compatible license
  2. Can be dynamically started, configured, and shut down
    1. For integration tests, this means it has to be written in Java
  3. Is mature enough to rely on

There following projects were investigated but disqualified:

  1. Keycloak (using testcontainers) is very capable, but does not support the JWT Bearer grant type
  2. Apache Shiro is less mature, and also does not support the JWT Bearer grant type
  3. Spring Security was very heavyweight and it was not immediately obvious how to integrate it into our testing framework

In the end the mock-oauth2-server project was selected as it fits the criteria:

  • Is based on an Apache-compatible license (MIT)
  • Is written in Java and the lifecycle and configuration of the OAuth library is extensible
  • It has been around for at least four years and appears to be widely used and maintained

This library will be used for the integration test layer to allow clients and brokers to communicate end-to-end with an "external" identity provider.

Rejected Alternatives

Using JAAS Options Instead of Configuration

Initially, the configuration was to be made keys in the JAAS configuration (sasl.jaas.config) as the existing OAuth implementation does. However, after more thought and discussion, it was decided to make them top-level configuration. Since the OAuth standard encompasses a broad range of options, and different organizations have varied security policies, there was concern that over time this could increase the configuration for the users and operators. However, configuration gives users and operators the following benefits:

  • Auto-generated documentation
  • Troubleshooting using runtime logs
  • Deployment flexibility

Support for Multiple Grant Types

Adding comprehensive support for every supported grant type specified in the RFC is not the goal of the KIP. The current approach is to add support for additional grant types as they gain adoption by users in the Kafka community. In addition, not all OAuth identity providers support the client_credentials grant type, which then precludes using Apache Kafka’s current support for OAuth.

  • No labels