DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA:
KAFKA-18573
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka added basic support for machine-to-machine (M2M) OAuth 2.0 flows on the client in KIP-768. Though the OAuth 2.0 family of specifications support several grant types applicable in M2M applications, as of Apache Kafka 4.0, only the client_credentials grant type is supported. Some organizations are wary of using the current client_credentials grant type implementation since it requires a secret to be specified in plain text in the configuration. For that reason, not all cloud providers support the client_credentials grant type.
Primary Motivator: jwt-bearer Grant Type Support
The focus of this KIP is to introduce support for the OAuth 2.0 urn:ietf:params:oauth:grant-type:jwt-bearer grant type as defined in RFC 7523. RFC 7523 details how a client can authenticate using an assertion (a cryptographically signed JWT defined in RFC 7521) instead of a plain text secret when requesting an OAuth 2.0 token. This reduces potential security compromises as the static secret is not part of the deployment configuration directly and the information used to create the assertion can be rotated as dictated by the organization's policy.
Shorthand
urn:ietf:params:oauth:grant-type:jwt-bearer.
Here is the basic flow when using the jwt-bearer grant type with private key information:
- The Apache Kafka client first reads any configuration and/or data used to create the assertion
- A temporary JWT is created on the client that contains claims (e.g.
iss,sub,aud,iat,exp, as well as arbitrary custom claims (RFC 7523, section 3)) - The assertion is created by the client ("self-signed") using a configured private key (typically RSA256) to encrypt the JWT (RFC 7515, section 3), thus allowing the client to authenticate without secrets
- The assertion is exchanged with the identity provider using the HTTP parameters
assertionandgrant_typewith a value ofurn:ietf:params:oauth:grant-type:jwt-bearer(RFC 7523, section 2.1) - If the assertion is valid, a JWT is returned that contains claims that are germane to the Apache Kafka broker
- As part of the connection handshake process, the JWT is then sent from the client to the broker
- Brokers will fetch and cache key sets (JWKS) from the identity provider that contains the information necessary to validate the JWT from the client
- The identity provider will return the JWKS for use by the broker
- Using the JWKS and the claims in the JWT received by the client, the broker will validate that the JWT contains expected data
- The broker will continue its process for initializing the connection and eventually return success (or failure) to the client
This OAuth flow allows clients to authenticate using an assertion instead of relying on client credentials or refresh tokens. Because this mechanism does not require long-lived client secrets, the Kafka ecosystem would benefit from the improved security and simplified credential management. Additionally, JWTs allow embedding structured claims, making them useful for role-based access control (RBAC) and multi-tenant deployments. As it is an industry standard, the JWT Bearer grant type is supported by many identity identity providers; in some cases, the JWT Bearer grant type may be the only grant type a given identity provider supports.
Secondary Motivator: Extensibility
A secondary motivation for this KIP is to refactor the existing code for future OAuth 2.0 work. Much of the existing implementation of the OAuth support is currently hidden inside internal packages that are not meant for outside usage. This has burdened users that want to extend Kafka's OAuth support to enable specific use cases; it forces them to either reinvent much of the logic or to tie their code to the internal APIs anyway. It has been noted that other downstream projects are coding against these internal APIs anyway, so exposing some basic building blocks could be useful to avoid toil. So in addition to supporting the jwt-bearer grant type, many of the changes in this KIP are designed to allow future extension of the OAuth support with less encumbrance.
Reuse of Existing OAuth Primitives
The addition of support for the JWT Bearer grant type flow and associated refactoring builds upon the other services and primitives in the existing Kafka OAuth layer. The following are still applicable for the new JWT Bearer support:
- Cluster operators can continue to use the
connections.max.reauth.msconfiguration to trigger token refresh on the client- Client token refresh will use the JWT Bearer logic instead of the Client Credentials flow
- Tokens acquired via the JWT Bearer flow still use the OAuth JWT validation mechanisms in the broker
The JWT Bearer work is additive, not a replacement.
Public Interfaces
Classes
All classes, interfaces, and exceptions reside in the org.apache.kafka.common.security.oauthbearer package.
JwtRetriever
/**
* An implementation of <code>JwtRetriever</code> is the means by which the login module will
* retrieve an OAuth JWT that is used to authorize with a broker. The implementation may
* involve communicating with remote systems, local files, or reading contents of a
* configuration setting.
*
* <p/>
*
* <i>Retrieval</i> of a token is a separate concern from <i>validation</i>.
* <code>JwtRetriever</code> implementations should not validate the integrity of the
* token, but should rely on the companion {@link JwtValidator} for that task.
*
* <p/>
*
* Because a <code>JwtRetriever</code> is instantiated from its class name, the
* implementation must include a no-argument constructor. This interface extends from
* {@link OAuthBearerConfigurable}, which is analogous to {@link Configurable} in that
* implementations are expected to wait until that method is called to create any
* resources. Any such resources should be closed in {@link #close()}.
*/
public interface JwtRetriever extends OAuthBearerConfigurable {
/**
* Retrieves a JWT in its serialized three-part form. The implementation is free to
* determine how it should be retrieved but should not perform validation on the result.
*
* <p/>
*
* <b>Note</b>: This is a blocking function and callers should be aware that the
* implementation may be communicating over a network, with the file system, coordinating
* threads, etc. The facility in the {@link LoginModule} from which this is ultimately
* called does not provide an asynchronous approach.
*
* @return Non-<code>null</code> JWT string
*
* @throws JwtRetrieverException For errors related to I/O, parsing, etc. during retrieval
*/
String retrieve() throws JwtRetrieverException;
/**
* Closes any resources used by this implementation. The default implementation of
* this method is a no op, for convenience to implementors.
*/
@Override
default void close() throws IOException {
// Do nothing...
}
}
The following are stock implementations of JwtRetriever that can be used by configuring sasl.oauthbearer.jwt.retriever.class. The following classes do not expose any public methods other than those in JwtRetriever.
ClientCredentialsJwtRetriever
This implementation of JwtRetriever communicates with an identity provider directly via HTTP. It posts client credentials that are derived from the sasl.oauthbearer.client.credentials.client.id and sasl.oauthbearer.client.credentials.client.secret configuration by encoding them in the standard HTTP Authorization header. The optional sasl.oauthbearer.scope configuration value is included in the OAuth scope HTTP parameter, if set. The URL to which this information is sent is set in the sasl.oauthbearer.token.endpoint.url configuration.
Existing internal class
The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.retriever.class.
DefaultJwtRetriever
Per its name, this class serves as the default implementation of JwtRetriever that will be used if sasl.oauthbearer.jwt.retriever.class is not configured. It provides backward compatibility with previous versions of Kafka that did not let the user choose the JwtRetriever. At runtime, depending on the configuration, this class will create and delegate to an instance of one of these implementations:
ClientCredentialsJwtRetrieverFileJwtRetrieverJwtBearerJwtRetriever
The following flowchart shows the logic to select the correct delegate:

FileJwtRetriever
This JwtRetriever implementation interprets the sasl.oauthbearer.token.endpoint.url configuration as a local filesystem path, e.g. file:/super/secure/jwt.txt. At runtime, the contents of the file are read and assumed to be in a serialized JWT.
Existing internal class
The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.retriever.class.
JwtBearerJwtRetriever
This implementation of JwtRetriever communicates with an identity provider directly via HTTP. It posts a client-signed assertion using the configuration in the assertion HTTP parameter. The optional sasl.oauthbearer.scope configuration value is included in the OAuth scope HTTP parameter, if set. The URL to which this information is sent is set in the sasl.oauthbearer.token.endpoint.url configuration.
JwtValidator
/**
* An instance of <code>JwtValidator</code> acts as a function object that, given a String in
* base 64-encoded JWT format, can parse the data, perform validation, and construct an
* {@link OAuthBearerToken} for use by the caller.
*
* <p/>
*
* The primary reason for this abstraction is that client and broker may have different libraries
* available to them to perform these operations. Additionally, the exact steps for validation may
* differ between implementations. To put this more concretely: the implementation in the Kafka
* client does not have bundled a robust library to perform this logic, and it is not the
* responsibility of the client to perform vigorous validation. However, the Kafka broker ships with
* a richer set of library dependencies that can perform more substantial validation and is also
* expected to perform a trust-but-verify test of the token's signature.
*
* See:
*
* <ul>
* <li><a href="https://datatracker.ietf.org/doc/html/rfc6749#section-1.4">RFC 6749, Section 1.4</a></li>
* <li><a href="https://datatracker.ietf.org/doc/html/rfc6750#section-2.1">RFC 6750, Section 2.1</a></li>
* <li><a href="https://datatracker.ietf.org/doc/html/draft-ietf-oauth-access-token-jwt">RFC 6750, Section 2.1</a></li>
* </ul>
*
* @see DefaultJwtValidator Default validator that acts as a wrapper over one of the other validators
* @see ClientJwtValidator A basic JwtValidator used by client-side login authentication
* @see BrokerJwtValidator A more robust JwtValidator that is used on the broker
* to validate the token's contents and verify the signature
*/
public interface JwtValidator extends OAuthBearerConfigurable {
/**
* Accepts an OAuth JWT in base 64-encoded format, validates, and returns an
* OAuthBearerToken.
*
* @param token Non-<code>null</code> String in JWT format
*
* @return {@link OAuthBearerToken}
*
* @throws JwtValidatorException Thrown on errors performing validation of given token
*/
OAuthBearerToken validate(String token) throws JwtValidatorException;
/**
* Closes any resources used by this implementation. The default implementation of
* this method is a no op, for convenience to implementors.
*/
@Override
default void close() throws IOException {
// Do nothing...
}
}
The following are stock implementations of JwtValidator that can be used by configuring sasl.oauthbearer.jwt.validator.class appropriately. The following classes do not expose any public methods other than those in JwtValidator.
BrokerJwtValidator
This implementation of JwtValidator is used by the broker to perform more extensive validation of the JWT that is received from the client, but ultimately from posting the client credentials to the identity provider.
The validation steps performed (primary by the jose4j library) are:
- Basic structural validation of the b64token value as defined in RFC 6750 Section 2.1
- Basic conversion of the token into an in-memory claims map
- Presence of
scope,exp,sub,iss, andiatclaims - Signature matching validation against the
kidand those provided by the OAuth/OIDC provider's JWKS
Existing internal class
The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.
ClientJwtValidator
This implementation of JwtValidator is used by the client to perform some rudimentary validation of the JWT that is received as part of the response from posting the client credentials to the identity provider.
The validation steps performed are:
- Basic structural validation of the b64token value as defined in RFC 6750 Section 2.1
- Basic conversion of the token into an in-memory claims map
- Presence of
scope,exp,sub, andiatclaims
Existing internal class
The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.
DefaultJwtValidator
This class is the default implementation of JwtValidator that is used if sasl.oauthbearer.jwt.validator.class is not configured. It provides backward compatibility with previous versions of Kafka that did not let the user choose the JwtValidator. At runtime, depending on the configuration, this class will create and delegate to an instance of one of these implementations:
BrokerJwtValidatorClientJwtValidator
The following flowchart shows the logic to select the correct delegate:

Existing internal class
The logic used for this class has been present in Apache Kafka since 3.5.0. It is being promoted from an internal class to a public class for use in sasl.oauthbearer.jwt.validator.class.
JwtRetrieverException
/**
* A {@code JwtRetrieverException} is thrown in cases where the JWT cannot be retrieved.
*
* @see JwtRetriever#retrieve()
*/
public class JwtRetrieverException extends KafkaException {
public JwtRetrieverException(String message) {
super(message);
}
public JwtRetrieverException(Throwable cause) {
super(cause);
}
public JwtRetrieverException(String message, Throwable cause) {
super(message, cause);
}
}
JwtValidatorException
/**
* A {@code JwtValidatorException} is thrown in cases where the validity of a JWT cannot be
* determined. It is intended to be used when errors arise within the processing of a
* {@link CallbackHandler#handle(Callback[])}. This error, however, is not thrown from that
* method directly.
*
* @see JwtValidator#validate(String)
*/
public class JwtValidatorException extends KafkaException {
public JwtValidatorException(String message) {
super(message);
}
public JwtValidatorException(Throwable cause) {
super(cause);
}
public JwtValidatorException(String message, Throwable cause) {
super(message, cause);
}
}
OAuthBearerConfigurable
/**
* Analogue to {@link Configurable} for OAuth-based authentication. This interface presents a similar
* method signature as that of the {@link AuthenticateCallbackHandler} interface. However, this interface is
* needed because {@link AuthenticateCallbackHandler} extends the JDK's {@link CallbackHandler} interface.
*
* <p/>
*
* <em>Note</em>:
*
* <ol>
* <li>
* Any class that <em>implements</em> this interface should initialize resources via
* {@link #configure(Map, String, List)} and release them via {@link #close()}.
* </li>
* <li>
* Any class that <em>instantiates</em> an object that implements {@code OAuthBearerConfigurable}
* must properly call that object's ({@link #configure(Map, List)} and {@link #close()}) methods
* so that the object can initialize and release resources.
* </li>
* </ol>
*/
public interface OAuthBearerConfigurable extends Closeable {
/**
* Configures this object for the specified SASL mechanism.
*
* @param configs Key-value pairs containing the parsed configuration options of
* the client or broker. Note that these are the Kafka configuration options
* and not the JAAS configuration options. JAAS config options may be obtained
* from `jaasConfigEntries`. For configs that may be specified as both Kafka config
* as well as JAAS config (e.g. sasl.kerberos.service.name), the configuration
* is treated as invalid if conflicting values are provided.
* @param jaasConfigEntries JAAS configuration entries from the JAAS login context.
* This list contains a single entry for clients and may contain more than
* one entry for brokers if multiple mechanisms are enabled on a listener using
* static JAAS configuration where there is no mapping between mechanisms and
* login module entries. In this case, implementations can use the login module in
* `jaasConfigEntries` to identify the entry corresponding to `saslMechanism`.
* Alternatively, dynamic JAAS configuration option
* {@link org.apache.kafka.common.config.SaslConfigs#SASL_JAAS_CONFIG} may be
* configured on brokers with listener and mechanism prefix, in which case
* only the configuration entry corresponding to `saslMechanism` will be provided
* in `jaasConfigEntries`.
*/
void configure(Map<String, ?> configs, List<AppConfigurationEntry> jaasConfigEntries);
}
Configuration
Broker Naming Convention
When used for inter-broker communication, the configuration is name prefixed with the listener name:
listener.name.<listener name>.oauthbearer.
So if a broker has a listener named external...
sasl.oauthbearer.token.endpoint.url
...would become...
listener.name.external.oauthbearer.sasl.oauthbearer.token.endpoint.url
Configuration Listing
The following table lists the configuration that is being introduced in this KIP.
| Configuration | Type | Required | Default | Description |
|---|---|---|---|---|
sasl.oauthbearer.assertion.algorithm | String | No |
| This is the algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth Valid options are:
Mutually exclusive configuration If the |
sasl.oauthbearer.assertion.claim.aud | String | No | The value to be used as the Mutually exclusive configuration If the | |
sasl.oauthbearer.assertion.claim.exp.seconds | Int | No | 300 | The number of seconds in the future that the JWT is valid. Used to determine the let x = the current timestamp in seconds let y = the value of this configuration then exp = $x + $y Mutually exclusive configuration If the |
sasl.oauthbearer.assertion.claim.iss | String | No | The value to be used as the Mutually exclusive configuration If the Mutually exclusive configuration If the | |
sasl.oauthbearer.assertion.claim.jti.include | boolean | No | false | This flag determines if the assertion should add a unique ID for the Mutually exclusive configuration If the |
sasl.oauthbearer.assertion.claim.nbf.seconds | Int | No | 60 | The number of seconds in the past that the JWT is valid. Used to determine the let x = the current timestamp in seconds let y = the value of this configuration then nbf = $x - $y Mutually exclusive configuration If the |
sasl.oauthbearer.assertion.claim.sub | String | No | The value to be used as the Mutually exclusive configuration If the | |
sasl.oauthbearer.assertion.file | String | No | Optional file name the user can specify a file in which a pre-generated assertion exists as a local file on disk. The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” assertion rotation without restarting. The file is the assertion in the serialized JWT format: header.payload.signature
See RFC 7519 and RFC 7515 for more details on the JWT and JWS formats. Mutually exclusive configuration If | |
sasl.oauthbearer.assertion.private.key.file | String | No | File name to a private key in the standard PEM format which is used to sign the assertion sent to the identity provider. The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” private key rotation without restarting. Mutually exclusive configuration If the | |
sasl.oauthbearer.assertion.private.key.passphrase | Password | No | The optional passphrase to decrypt the private key file. Mutually exclusive configuration If the | |
sasl.oauthbearer.assertion.template.file | String | No | This optional configuration specifies the file path containing the JWT headers and/or payload claims to be included in the assertion. Not all identity providers require the same set of claims; some may require a given claim while others may prohibit it. In order to provide the most flexibility, this allows the user to provide the static header values and claims that are to be included in the JWT. Here's an example template file that contains static values. {
"header": {
"kid": "f829d41b06f14f9e",
"some-random-header": 123456
},
"payload": {
"sub": "some-service-account",
"aud": "my_audience",
"iss": "https://example.com",
"useSomeResource": false,
"allowedAnimals": [
"cat",
"dog",
"hamster"
]
}
}
The assertion creation step would then augment the header and/or payload with dynamic values. For example, the above header would be augmented with the {
"kid": "f829d41b06f14f9e",
"some-random-header": 123456,
"alg": "RS256",
"typ": "JWT"
}
And the payload would also be augmented to add the {
"iat": 1741121401,
"exp": 1741125001,
"sub": "some-service-account",
"aud": "my_audience",
"iss": "https://example.com",
"useSomeResource": false,
"allowedAnimals": [
"cat",
"dog",
"hamster"
]
}
The underlying implementation may choose to cache the file contents to avoid the performance hit of loading the file on each access. If so, it's advisable, but not required, to have a mechanism to detect when the file changes to allow for the cache to be updated when the file is changed. This allows for “live” rotation of the static JWT contents without restarting. Please see RFC 7519 and RFC 7515 for more details on the JWT and JWS format. Mutually exclusive configuration If the | |
sasl.oauthbearer.client.credentials.client.id | String | No | The ID (defined in/by the identity provider) to determine the resource requesting the token. Replaces JAAS option The client ID was previously stored as part of the Order of precedence:
| |
sasl.oauthbearer.client.credentials.client.secret | Password | No | The secret (defined by either the user or preassigned, depending on the identity provider) to determine the resource requesting the token. Replaces JAAS option The client secret was previously stored as part of the Order of precedence:
| |
sasl.oauthbearer.grant.type | String | No | client_credentials | This is the OAuth grant type to use when communicating with the identity provider. On the whole, the OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the The default value of The built-in options are:
The OAuth code in Kafka does not limit the values that are used. A user can write a custom |
sasl.oauthbearer.jwt.retriever.class | Class | No | org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever | The fully-qualified class name of a The default value represents a class that maintains backward compatibility with previous versions of Apache Kafka. Other concrete implementations that this KIP introduces are:
|
sasl.oauthbearer.jwt.validator.class | Class | No | org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator | The fully-qualified class name of a The default value represents a class that maintains backward compatibility with previous versions of Apache Kafka. Other concrete implementations that this KIP introduces are:
|
sasl.oauthbearer.scope | String | No | This is the level of access a client application is granted to a resource or API which is included in the token request. If provided, it should match one or more scopes configured in the identity provider. Replaces JAAS option The scope was previously stored as part of the Order of precedence:
|
Restricting File Access
The Kafka project has encountered CVEs related to unsafe access to external resources, so care should be taken to ensure that only allowable resources are read. The OAuth layer already has a system property ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG that is an allow list for URLs to access. When OAuth-based clients and brokers access token URLs (sasl.oauthbearer.token.endpoint.url) or JWKS URLs (sasl.oauthbearer.jwks.endpoint.url), the configured values are checked against the the allow list in ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG before proceeding. This KIP thus introduces a similar mechanism for file access with a new system property ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG. This configuration is similarly a comma-separated list of file paths that the current process is allowed to access. The following OAuth configuration will be checked against this new system property:
sasl.oauthbearer.assertion.filesasl.oauthbearer.assertion.private.key.filesasl.oauthbearer.assertion.template.file
If the configured path for any of the above do not appear in ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, a ConfigException will be thrown.
Example
Here is an example of configuration for a JWT Bearer enabled Kafka login module (assumes the JVM was started with something like -DALLOWED_SASL_OAUTHBEARER_FILES_CONFIG=/path/to/template.json,/path/to/private.key):
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever sasl.oauthbearer.grant.type=urn:ietf:params:oauth:grant-type:jwt-bearer sasl.oauthbearer.assertion.private.key.file=/path/to/private.key sasl.oauthbearer.assertion.algorithm=RS256 sasl.oauthbearer.assertion.claim.exp.seconds=600 sasl.oauthbearer.assertion.template.file=/path/to/template.json sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ ssl.protocol="SSL" ;
Proposed Changes
OAuthCompatibility Tool
As part of the work for KIP-768, a standalone tool in the tools directory/module named org.apache.kafka.tools.OAuthCompatibilityTool was created, and this KIP will extend that tool.
The tool's use of command line options result in some issues:
- The tool uses a separate command line option for each configuration, which is a fixed set
- It potentially exposes sensitive information on the command line (e.g.
--client-secret=$3cr3+) - The configuration from the command line options are used for the client and the broker
This KIP will introduce two new command line options that can be used to work around these limitations:
--client-config: the name of the file containing the client's configuration--broker-config: the name of the file containing the broker's configuration
If the user specifies any explicit configuration via the command line, the tool will use the explicit option instead of the one from the configuration file, with a warning message emitted to stderr to inform users of the fact
Take this example where both configuration style arguments are used:
./bin/kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
--client-id foo \
--client-secret bar \
--client-config /tmp/client.properties \
--broker-config /tmp/broker.properties
Here, all the configuration for both the client and broker will be used except the value of sasl.jaas.config for the client. In this case, the value of sasl.jaas.config from the client configuration file will be ignored and replaced with a new value generated from --client-id and --client-secret.
File Reloading
The JWT Bearer implementation relies on files to generate its assertion. The implementation should automatically detect when changes occur on the underlying filesystem and reload and cache file contents. This is mostly to allow secret rotation for sasl.oauthbearer.assertion.private.key.file and sasl.oauthbearer.assertion.file, but it's also useful for sasl.oauthbearer.assertion.template.file. The exact mechanism to achieve this is specific for each Kafka client, but users should expect the files to be reloaded "reasonably" quickly. A lag of a few seconds between the file system change and when the application reloads the file is acceptable.
Compatibility, Deprecation, and Migration Plan
The addition of JWT Bearer support was designed to not affect existing users:
- The configuration for the client credentials flow remains the same
- Three JAAS options (
clientId,clientSecret, andscope) for the existingclient_credentialsgrant type are deprecated in favor of thesasl.oauthbearer.client.credentials.client.id,sasl.oauthbearer.client.credentials.client.secret, andsasl.oauthbearer.scopeconfiguration, respectively. In the case where the user does not migrate to the configuration-based approach and remains using JAAS, behavior remains the same as before this KIP. In the case where both a JAAS value and its respective configuration are provided, the configuration will take precedence.
Test Plan
As part of the work for KIP-768, a standalone tool in the tools directory/module named org.apache.kafka.tools.OAuthCompatibilityTool was created, and this KIP will build on that tool.
There is currently a testing gap for integration and system testing for the OAuth code. In order to test OAuth code, we need an OAuth-compatible service that meets the following requirements:
- Open source using an Apache-compatible license
- Can be dynamically started, configured, and shut down
- For integration tests, this means it has to be written in Java
- Is mature enough to rely on
There following projects were investigated but disqualified:
- Keycloak (using testcontainers) is very capable, but does not support the JWT Bearer grant type
- Apache Shiro is less mature, and also does not support the JWT Bearer grant type
- Spring Security was very heavyweight and it was not immediately obvious how to integrate it into our testing framework
In the end the mock-oauth2-server project was selected as it fits the criteria:
- Is based on an Apache-compatible license (MIT)
- Is written in Java and the lifecycle and configuration of the OAuth library is extensible
- It has been around for at least four years and appears to be widely used and maintained
This library will be used for the integration test layer to allow clients and brokers to communicate end-to-end with an "external" identity provider.
Rejected Alternatives
Using JAAS Options Instead of Configuration
Initially, the configuration was to be made keys in the JAAS configuration (sasl.jaas.config) as the existing OAuth implementation does. However, after more thought and discussion, it was decided to make them top-level configuration. Since the OAuth standard encompasses a broad range of options, and different organizations have varied security policies, there was concern that over time this could increase the configuration for the users and operators. However, configuration gives users and operators the following benefits:
- Auto-generated documentation
- Troubleshooting using runtime logs
- Deployment flexibility
Support for Multiple Grant Types
Adding comprehensive support for every supported grant type specified in the RFC is not the goal of the KIP. The current approach is to add support for additional grant types as they gain adoption by users in the Kafka community. In addition, not all OAuth identity providers support the client_credentials grant type, which then precludes using Apache Kafka’s current support for OAuth.