DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Table of Contents |
|---|
Status
Current state: "Under Discussion" Adopted (in 2.0)
Discussion thread: [not yet created]here
JIRA: [not yet created] KAFKA-6562
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The ability to authenticate to Kafka with an OAuth 2 Access Token is desirable given the popularity of OAuth. The introduction of KIP-86 (Configurable SASL callback handlers) makes it possible to non-intrusively add new SASL mechanisms, and OAUTHBEARER is the SASL mechanism for OAuth. OAuth 2 is a flexible framework with multiple ways of doing the same thing (for example, getting a public key to confirm a digital signature), so pluggable implementations – and flexibility in their configuration – is a requirement; the introduction of KIP-86: Configurable SASL callback handlers provides the necessary flexibility. "OAUTHBEARER" is the SASL mechanism for OAuth 2.
This KIP proposes to add the following functionality related to SASL/OAUTHBEARER:
- Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server according to JAAS-defined configuration (and potentially plugged-in implementation) and present the access token based on the declaration of a custom login
CallbackHandlerimplementation and have that access token transparently and automatically transmitted to a broker for authentication. - Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on JAAS-defined configuration (and potentially plugged-in implementation)on the declaration of a custom SASL Server
CallbackHandlerimplementation. - Provide a toolkit of reusable functionality applicable to common token retrieval and validation scenarios so that implementations can either be reused or, when they are custom and plugged-in, be relatively small.implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them). This unsecured functionality serves two purposes: first, it provides a way for SASL/OAUTHBEARER to be used in development scenarios out-of-the-box with no OAuth 2 infrastructure required; and second, it provides a way to test the SASL implementation itself. See Rejected Alternatives: Motivation.
- Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) Allow clients to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections. Any existing connections will remain unaffected by this "token refresh" functionality as long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any) – this . This is how Kerberos-authenticated connections work with respect to ticket expiration. This KIP does not attempt to unify the refresh-related code for the OAUTHBEARER and GSSAPI mechanisms, but it does include code that suggests a potential path forward if this unification is desired in the future.
Note Note that the access token can be made available to the broker for authorization decisions due to KIP-189: Improve principal builder interface and add support for SASL (by exposing the access token via a negotiated property on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal. It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens. For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token. This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped).
Public Interfaces
Another related issue that would need to be addressed is how to revoke authorizations. Connections are long-lived, and bearer tokens are immutable, so a mechanism to evolve or revoke permissions over time would have to exist. Again, this is outside the scope of this KIP.
Finally, note that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was at first deemed more generally useful and was separated out into its own KIP-269: Substitution Within Configuration Values, but that KIP is likely to be rejected/moved to the inactive list and is not required for this KIP (see Rejected Alternatives: Substitution Within Configuration Values).
Public Interfaces
The public interface for this KIP consists of 3 Java classes and 1 Java interface along with various configuration possibilities. The following sections define these public-facing parts of this KIP, including an overall UML diagram and Note that most of the implementation of this KIP will be public-facing. The following sections define the various parts, and each includes an overall UML diagram as well as important code details (with Javadoc) where appropriate.
...
OAuth Bearer Tokens and Token Retrieval
See Rejected Alternatives: Explicit Configuration of Token Refresh Class
See Rejected Alternatives: Exceptions
We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.
...
Callback Handlers and Callbacks
We define org.apache.
...
kafka.common.security.oauthbearer
...
.OAuthBearerToken to be the interface that all OAuth 2 bearer tokens must implement within the context of Kafka's SASL/OAUTHBEARER implementation. Scenarios that leverage open source JWT/JWS/JWE implementations must wrap the library's implementation of a token to implement this interface.
The org.apache.kafka.common.security.oauthbearer
...
.OAuthBearerLoginModule class is the JAAS login module that is declared in the JAAS configuration. When a client (whether a non-broker client or a broker when SASL/OAUTHBEARER is the inter-broker protocol) connects to Kafka the OAuthBearerLoginModule instance asks its configured AuthenticateCallbackHandler implementation to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and retrieve/return an instance of OAuthBearerToken. A default, builtin AuthenticateCallbackHandler implementation creates an unsecured token as defined by the JAAS module options – see below for configuration details – when another implementation is not explicitly specified. Production use cases will require writing an implementation of AuthenticateCallbackHandler that can handle an instance of OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).
Here are the parameters that can be provided as part of the JAAS configuration to determine the claims that appear in an unsecured OAuth Bearer Token generated by the default, out-of-the-box AuthenticateCallbackHandler implementation.
| JAAS Module Option for Unsecured Token Retrieval | Documentation |
|---|---|
unsecuredLoginStringClaim_<claimname>="value" | Creates a String claim with the given name and value. Any valid claim name |
unsecuredLoginNumberClaim_<claimname>="value" | Creates a Number claim with the given name and value. Any valid claim name |
unsecuredLoginListClaim_<claimname>="value" | Creates a String List claim with the given name and values parsed from the |
unsecuredLoginPrincipalClaimName | Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than ' sub'. |
unsecuredLoginLifetimeSeconds | Set to an integer value if the token expiration is to be set to something other |
unsecuredLoginScopeClaimName | Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than ' scope'. |
Here is a typical, basic JAAS configuration for a client leveraging unsecured SASL/OAUTHBEARER authentication:
KafkaClient {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required unsecuredLoginStringClaim_sub="thePrincipalName";};
An implementation of the org.apache.kafka.common.security.auth.Login interface specific to the OAUTHBEARER mechanism is automatically applied; it periodically refreshes any token before it expires so that the client can continue to make connections to brokers. The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration; they are as follows (the defaults are generally reasonable, so explicit configuration may not be necessary):
| Producer/Consumer/Broker Configuration Property | Documentation |
|---|---|
sasl.login.refresh.window.factor | Login refresh thread will sleep until the specified window factor relative to the credential's lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER. |
sasl.login.refresh.window.jitter | The maximum amount of random jitter relative to the credential's lifetime that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER. |
sasl.login.refresh.min.period.seconds | The desired minimum time for the login refresh thread to wait before refreshing a credential, |
sasl.login.refresh.buffer.seconds | The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible, overriding all other considerations. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceedsthe remaining lifetime of a credential. Currently applies only to OAUTHBEARER. |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* The <code>b64token</code> value as defined in
* <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
* 2.1</a> along with the token's specific scope and lifetime and principal
* name.
* <p>
* A network request would be required to re-hydrate an opaque token, and that
* could result in (for example) an {@code IOException}, but retrievers for
* various attributes ({@link #scope()}, {@link #lifetimeMs()}, etc.) declare no
* exceptions. Therefore, if a network request is required for any of these
* retriever methods, that request could be performed at construction time so
* that the various attributes can be reliably provided thereafter. For example,
* a constructor might declare {@code throws IOException} in such a case.
* Alternatively, the retrievers could throw unchecked exceptions.
*
* @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
* Section 1.4</a> and
* <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
* Section 2.1</a>
*/
public interface OAuthBearerToken {
/**
* The <code>b64token</code> value as defined in
* <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
* 2.1</a>
*
* @return <code>b64token</code> value as defined in
* <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* Exception thrown when there is a problem with the configuration (an invalid
* option in a JAAS config, for example).
*/
public class OAuthBearerConfigException extends OAuthBearerException {
// etc...
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* Exception thrown when token validation fails due to a problem with the token
* itself (as opposed to a missing remote resource or a configuration problem)
*/
public class OAuthBearerIllegalTokenException extends OAuthBearerException {
/**
* Constructor
*
* @param reason
* the mandatory reason for the validation failure; it must indicate
* failure
*/
public OAuthBearerIllegalTokenException(OAuthBearerValidationResult reason) {
super(Objects.requireNonNull(reason).failureDescription());
if (reason.success())
throw new IllegalArgumentException("The reason indicates success; it must instead indicate failure");
this.reason = reason;
}
/**
* Return the (always non-null) reason for the validation failure
*
* @return the reason for the validation failure
*/
public OAuthBearerValidationResult reason() {
return reason;
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * The result of some kind of token validation */ public class OAuthBearerValidationResult implements Serializable { // etc... /** * Return an instance indicating success * * @return an instance indicating success */ public static OAuthBearerValidationResult newSuccess() { return new OAuthBearerValidationResult(true, null, null, null); } /** * Return a new validation failure instance * * @param failureDescription * optional description of the failure * @return a new validation failure instance */ public static OAuthBearerValidationResult newFailure(String failureDescription) { return newFailure(failureDescription, null, null); } /** * Return a new validation failure instance * * @param failureDescription * optional description of the failure * @param failureScope * optional scope to be reported with the failure * @param failureOpenIdConfig * Section 2.1</a> optional OpenID Connect configuration*/ to be reported with theString value(); /** * The token's scope of access, as failureper * @return a new validation failure instance<a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section * 1.4</a> public * static OAuthBearerValidationResult newFailure(String failureDescription, String* failureScope, @return the token's (always non-null but potentially empty) scope of access, String failureOpenIdConfig) { * return new OAuthBearerValidationResult(false, failureDescription, failureScope, failureOpenIdConfig);as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC } * private OAuthBearerValidationResult(boolean success, String failureDescription, String6749 failureScope, String failureOpenIdConfig) { Section 1.4</a>. Note that all values in the returned set will * // etc... } /** be trimmed of preceding and trailing whitespace, and the result will * Return true if this instance indicates success, otherwise false never contain the empty *string. */ @return true if this instance indicates success, otherwise falseSet<String> scope(); /** */ The token's lifetime, publicexpressed booleanas success() { the number of milliseconds since the return success; * epoch, as per } /**<a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC * Return6749 the (potentially null) descriptive message for the failureSection 1.4</a> * * @return the token'slifetime, expressed *as @return the (potentiallynumber null)of descriptivemilliseconds messagesince for the failure * */ public String failureDescription() { the epoch, as per * return failureDescription; } <a /**href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 * Return the (potentially null) scope to be reported with the failureSection 1.4</a>. */ * @return the (potentially null) scope to be reported with the failure */ public String failureScope() {long lifetimeMs(); /** * The name of the principal to which this credential applies * return failureScope; * @return the } /**always non-null/non-empty principal name */ Return theString principalName(potentially null); OpenID Connect configuration to be reported /** * withWhen the failure *credential became valid, in terms of the number of milliseconds * @returnsince the (potentiallyepoch, if known, otherwise null). OpenIDAn Connectexpiring configurationcredential tomay be reportednot * necessarily indicate when it was created -- just withwhen theit failure expires -- so */we public String* failureOpenIdConfig() { return failureOpenIdConfig; } need to support a null return value here. /** * Raise@return anthe exceptiontime ifwhen thisthe instancecredential indicatesbecame failurevalid, otherwisein doterms nothing of the number *of * @throws OAuthBearerIllegalTokenException * milliseconds since if this instance indicates failurethe epoch, if known, otherwise null */ publicLong void throwExceptionIfFailed() throws OAuthBearerIllegalTokenException { if (!success()) throw new OAuthBearerIllegalTokenException(this); } } |
Token Refresh
See Rejected Alternatives: Token Refresh
...
startTimeMs();
} |
| Code Block | ||||
|---|---|---|---|---|
|
...
| |||
package org.apache.kafka.common.security.oauthbearer |
...
Note that the token refresh functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this refresh-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else. If this functionality ends up in elsewhere then perhaps RefreshConfigProp.parseValue(Object) should not throw an OAuthBearerConfigException but rather a different exception type.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.refresh;
/**
* A credential that expires and that can potentially be refreshed
*
* @see ExpiringCredentialRefreshingLogin
*/
public interface ExpiringCredential {
/**
* The name of the principal to which this credential applies (used only for
* logging)
*
* @return the always non-null/non-empty principal name
*/
String principalName();
/**
* When the credential became valid, in terms of the number of milliseconds
* since the epoch, if known otherwise null
*
* @return the time when the credential became valid, in terms of the number of
* milliseconds since the epoch, if known otherwise null
*/
Long startTimeMillis();
/**
* When the credential expires, in terms of the number of milliseconds since the
* epoch
*
* @return the time when the credential expires, in terms of the number of
* milliseconds since the epoch
*/
long expireTimeMillis();
/**
* The point after which the credential can no longer be refreshed, in terms of
* the number of milliseconds since the epoch, if any, otherwise null.
*
* @return the point after which the credential can no longer be refreshed, in
* terms of the number of milliseconds since the epoch, if any,
* otherwise null
*/
Long absoluteLastRefreshMillis();
} |
| Code Block | |
|---|---|
| language | java | title | ; /** * The {@code LoginModule} for the SASL/OAUTHBEARER mechanism. When a client * (whether a non-broker client or a broker when SASL/OAUTHBEARER is the * inter-broker protocol) connects to Kafka the {@code OAuthBearerLoginModule} * instance asks its configured {@link AuthenticateCallbackHandler} * implementation to handle an instance of {@link OAuthBearerTokenCallback} and * return an instance of {@link OAuthBearerToken}. A default, builtin * {@link AuthenticateCallbackHandler} implementation creates an unsecured token * as defined by these JAAS module options: * <table> * <tr> * <th>JAAS Module Option for Unsecured Token Retrieval</th> * <th>Documentation</th> * </tr> * <tr> * <td>{@code unsecuredLoginStringClaim_<claimname>="value"}</td> * <td>Creates a {@code String} claim with the given name and value. Any valid * claim name can be specified except '{@code iat}' and '{@code exp}' (these are * automatically generated).</td> * </tr> * <tr> * <td>{@code unsecuredLoginNumberClaim_<claimname>="value"}</td> * <td>Creates a {@code Number} claim with the given name and value. Any valid * claim name can be specified except '{@code iat}' and '{@code exp}' (these are * automatically generated).</td> * </tr> * <tr> * <td>{@code unsecuredLoginListClaim_<claimname>="value"}</td> * <td>Creates a {@code String List} claim with the given name and values parsed * from the given value where the first character is taken as the delimiter. For * example: {@code unsecuredLoginListClaim_fubar="|value1|value2"}. Any valid * claim name can be specified except '{@code iat}' and '{@code exp}' (these are * automatically generated).</td> * </tr> * <tr> * <td>{@code unsecuredLoginPrincipalClaimName}</td> * <td>Set to a custom claim name if you wish the name of the String claim * holding the principal name to be something other than '{@code sub}'.</td> * </tr> * <tr> * <td>{@code unsecuredLoginLifetimeSeconds}</td> * <td>Set to an integer value if the token expiration is to be set to something * other than the default value of 3600 seconds (which is 1 hour). The * '{@code exp}' claim will be set to reflect the expiration time.</td> * </tr> * <tr> * <td>{@code unsecuredLoginScopeClaimName}</td> * <td>Set to a custom claim name if you wish the name of the String or String * List claim holding any token scope to be something other than * '{@code scope}'.</td> * </tr> * </table> * Production use cases will require writing an implementation of * {@link AuthenticateCallbackHandler} that can handle an instance of * {@link OAuthBearerTokenCallback} and declaring it via either the * {@code sasl.login.callback.handler.class} configuration option for a * non-broker client or via the * {@code listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class} * configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker * protocol). * <p> * Here is a typical, basic JAAS configuration for a client leveraging unsecured * SASL/OAUTHBEARER authentication: * * <pre> * KafkaClient { * org.apache.kafka.common.security.oauthbearer. | refresh.RefreshConfigProp
| collapse | true | package org.apache.kafka.common.security.oauthbearer.refresh; /** * Individual refresh-related configuration properties defining how * {@link ExpiringCredentialRefreshingLogin} refreshes instances ofOAuthBearerLoginModule Required * unsecuredLoginStringClaim_sub="thePrincipalName"; * }; * </pre> * * An implementation of the {@link Login} interface specific to the * {@link@code ExpiringCredentialOAUTHBEARER}. Eachmechanism valueis hasautomatically aapplied; type, optional min/max/defaultit periodically * values,refreshes anany orderedtoken listbefore ofit stringexpires keysso (containingthat the enumclient can namecontinue andto themake * camelCaseconnections versionto ofbrokers. theThe name,parameters in that order)impact sohow itthe can be picked out of arefresh algorithm * mapoperates are keyedspecified withas Strings,part andof the producer/consumer/broker configuration * are as follows. See the documentation for these properties elsewhere for * details. * <table> * <tr> * <th>Producer/Consumer/Broker Configuration Property</th> * </tr> * <tr> * <td>{@code sasl.login.refresh.window.factor}</td> * </tr> * <tr> * <td>{@code sasl.login.refresh.window.jitter}</td> * </tr> * <tr> * <td>{@code sasl.login.refresh.min.period.seconds}</td> * </tr> * <tr> * <td>{@code sasl.login.refresh.min.buffer.seconds}</td> * </tr> * </table> * When a broker accepts a SASL/OAUTHBEARER connection the instance of the * builtin {@code SaslServer} implementation asks its configured * {@link AuthenticateCallbackHandler} implementation to handle an instance of * {@link OAuthBearerValidatorCallback} constructed with the OAuth 2 Bearer * Token's compact serialization and return an instance of * {@link OAuthBearerToken} if the value validates. A default, builtin * {@link AuthenticateCallbackHandler} implementation validates an unsecured * token as defined by these JAAS module options: * <table> * <tr> * <th>JAAS Module Option for Unsecured Token Validation</th> * <th>Documentation</th> * </tr> * <tr> * <td>{@code unsecuredValidatorPrincipalClaimName="value"}</td> * <td>Set to a non-empty value if you wish a particular String claim holding a * principal name to be checked for existence; the default is to check for the * existence of the '{@code sub}' claim.</td> * </tr> * <tr> * <td>{@code unsecuredValidatorScopeClaimName="value"}</td> * <td>Set to a custom claim name if you wish the name of the String or String * List claim holding any token scope to be something other than * '{@code scope}'.</td> * </tr> * <tr> * <td>{@code unsecuredValidatorRequiredScope="value"}</td> * <td>Set to a space-delimited list of scope values if you wish the * String/String List claim holding the token scope to be checked to make sure * it contains certain values.</td> * </tr> * <tr> * <td>{@code unsecuredValidatorAllowableClockSkewMs="value"}</td> * <td>Set to a positive integer value if you wish to allow up to some number of * positive milliseconds of clock skew (the default is 0).</td> * </tr> * </table> * Here is a typical, basic JAAS configuration for a broker leveraging unsecured * SASL/OAUTHBEARER validation: * * <pre> * KafkaServer { * org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required * unsecuredLoginStringClaim_sub="thePrincipalName"; * }; * </pre> * * Production use cases will require writing an implementation of * {@link AuthenticateCallbackHandler} that can handle an instance of * {@link OAuthBearerValidatorCallback} and declaring it via the * {@code listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class} * configuration option. * * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_BUFFER_SECONDS_DOC */ public class OAuthBearerLoginModule implements LoginModule { static { OAuthBearerSaslClientProvider.initialize(); // not part of public API OAuthBearerSaslServerProvider.initialize(); // not part of public API } // etc... } |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * A {@code Callback} for use by the {@code SaslClient} and {@code Login} * implementations when they require an OAuth 2 bearer token. Callback handlers * should use the {@link #error(String, String, String)} method to communicate * errors returned by the authorization server as per * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * 2.0 Authorization Framework</a>. Callback handlers should communicate other * problems by raising an {@code IOException}. */ public class OAuthBearerTokenCallback implements Callback { private OAuthBearerToken token = null; private String errorCode = null; private String errorDescription = null; private String errorUri = null; /** * Return the (potentially null) token * * @return the (potentially null) tokenability to parse a string or non-string value * associated with one of its string keys in a map. * * @see RefreshConfig */ public enum RefreshConfigProp { /** * Background login refresh thread will sleep until the specified window factor * relative to the credential's total lifetime has been reached, at which time * it will try to refresh the credential. The default value is 0.8 (80%). */ REFRESH_WINDOW_FACTOR(Double.class, 0.5, 1.0, 0.8), /** * Amount of random jitter added to the background login refresh thread's sleep * time. The default value is 0.05 (5%). */ REFRESH_WINDOW_JITTER(Double.class, 0.0, 0.25, 0.05), /** * The minimum time between checks by the background login refresh thread, * regardless of other constraints, in milliseconds. The default value is 60,000 * (1 minute). */ REFRESH_MIN_PERIOD_MILLIS(Long.class, 0L, 1000L * 60 * 15, 1000L * 60 * 1), /** * If the {@code LoginModule} and {@code SaslClient} implementations support * multiple simultaneous login contexts on a single {@code Subject} at the same * time. If true, then upon refresh, logout will only be invoked on the original * {@code LoginContext} after a new one successfully logs in. This can be * helpful if the original credential still has some lifetime left when an * attempt to refresh the credential fails; the client will still be able to * create new connections as long as the original credential remains valid. * Otherwise, if logout is immediately invoked prior to relogin, a relogin * failure leaves the client without the ability to connect until relogin does * in fact succeed. The default value is false. */ RELOGIN_ALLOWED_BEFORE_LOGOUT(Boolean.class, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE); private RefreshConfigProp(Class<? extends Comparable<?>> valueType, Comparable<?> minValue, Comparable<?> maxValue, Comparable<?> defaultValue) { // etc... } /** * Return the always non-null value type * * @return the always non-null value type */ public Class<? extends Comparable<?>> valueType() { return valueType; } /** * Return the minimum value, if any, as an instance of the value type, otherwise * null * * @return the minimum value, if any, as an instance of the value type, * otherwise null */ public Comparable<?>OAuthBearerToken minValuetoken() { return minValuetoken; } /** * Return the maximum value, if any,(always non-empty) error code as an instance of the value type, otherwiseper * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * null2.0 Authorization Framework</a>. * * @return the maximum value, if any, as an instance of the value type, * otherwise null (always non-empty) error code */ public Comparable<?>String maxValueerrorCode() { return maxValueerrorCode; } /** * Return the default value, if any, as an instance of the value type, otherwise(potentially null) error description as per * null *<a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * @return2.0 the default value, if any, as an instance of the value type,Authorization Framework</a>. * * @return the (potentially null) otherwise nullerror description */ public Comparable<?>String defaultValueerrorDescription() { return defaultValueerrorDescription; } /** * TheReturn ordered list of string keys (containing the enum name and the camelCasethe (potentially null) error URI as per * version of the name, in that order) so the instance can be picked out of a <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * 2.0 Authorization Framework</a>. * * map keyed with Strings@return the (potentially null) error URI */ public String * @return the always non-null/non-empty list of string keys (containing theerrorUri() { return errorUri; } * /** * enumSet name and the camelCase version of the name, in that order) thatthe token. All error-related values are cleared. * * @param token * denote this instancethe inmandatory atoken {@codeto Map}set */ public List<String>void stringKeystoken(OAuthBearerToken token) { this.token return stringKeys= Objects.requireNonNull(token); } /**this.errorCode = null; * Return the (potentiallythis.errorDescription = null); value as the value type this.errorUri = *null; * @param value} /** * Set the error values as per a (potentially null) value of* either the value type or a type such<a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * 2.0 Authorization Framework</a>. Any token is cleared. that* the result of invoking {@code valueOf()} on the value type* @param errorCode * class against the valuemandatory resultserror in the value converted code to theset * @param errorDescription * value type * @return the (potentiallyoptional null)error valuedescription as the value typeto set * @throws@param OAuthBearerConfigExceptionerrorCode * the ifoptional aerror non-nullURI valueto cannotset be converted to the value*/ type while public void error(String *errorCode, String errorDescription, String errorUri) { remaining consistent with any min/max constraints */if (Objects.requireNonNull(errorCode).isEmpty()) throw new IllegalArgumentException("error code must not be empty"); public Comparable<?> parseValue(Object value) throwsthis.errorCode OAuthBearerConfigException= {errorCode; // etc... this.errorDescription = errorDescription; } this.errorUri = errorUri; // etc...this.token = null; } } | ||||||
| Code Block | ||||||
| language | java | title |
Token Validation
See Rejected Alternatives: Callback Handlers and Callbacks
We define the org.apache.kafka.common.security.oauthbearer.
...
OAuthBearerValidatorCallback class as the callback class for communicating that we want to validate a bearer token compact serialization provided by the connecting client. When a broker accepts a SASL/OAUTHBEARER connection the instance of the builtin SaslServer implementation asks its configured AuthenticateCallbackHandler implementation to handle an instance of OAuthBearerValidatorCallback constructed with the OAuth 2 Bearer Token's compact serialization and return an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken if the compact serialization validates. A default, builtin AuthenticateCallbackHandler implementation validates an unsecured token as defined by these JAAS module options:
| JAAS Module Option for Unsecured Token Validation | Documentation |
|---|---|
unsecuredValidatorPrincipalClaimName="value" | Set to a non-empty value if you wish a particular String claim holding |
unsecuredValidatorScopeClaimName="value" | Set to a custom claim name if you wish the name of the String or String |
unsecuredValidatorRequiredScope="value" | Set to a space-delimited list of scope values if you wish the |
unsecuredValidatorAllowableClockSkewMs="value" | Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0). |
Here is a typical, basic JAAS configuration for a broker leveraging unsecured SASL/OAUTHBEARER authentication:
KafkaServer {};
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName";
Production use cases will require writing an implementation of AuthenticateCallbackHandler that can handle an instance of OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class configuration option.
The validated token will be available as a negotiated property on the SaslServer instance with the key OAUTHBEARER.token so it can be used for authorization as per KIP-189: Improve principal builder interface and add support for SASL. Note that the implementation of SaslServer is not part of the public interface – just the key where it makes the validated token available.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * A {@code Callback} for use by the {@code SaslServer} implementation when it * needs to provide an OAuth 2 bearer token compact serialization for * validation. Callback handlers should use the * {@link #error(String, String, String)} method to communicate errors back to * the SASL Client as per * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth * 2.0 Authorization Framework</a> and the <a href= * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA * OAuth Extensions Error Registry</a>. Callback handlers should communicate * other problems by raising an {@code IOException}. */ public class OAuthBearerValidatorCallback implements Callback { private final String tokenValue; private OAuthBearerToken token = null; private String errorStatus = null; private String errorScope = null; private String errorOpenIDConfiguration = null; /** * Constructor * * @param tokenValue * the mandatory/non-blank token value */ public OAuthBearerValidatorCallback(String tokenValue) { if (Objects.requireNonNull(tokenValue).isEmpty()) throw new IllegalArgumentException("token value must not be empty"); this.tokenValue = tokenValue; } /** * Return the (always non-null) token value * * @return the (always non-null) token value */ public String tokenValue() { return tokenValue; } /** * Return the (potentially null) token * * @return the (potentially null) token */ public OAuthBearerToken tokenrefresh; /** * Immutable refresh-related configuration for instances of * {@link ExpiringCredentialRefreshingLogin}. Configuration that is independent * of the actual credential itself and that can be defined as login module * options in a JAAS config should be stored here. */ public class RefreshConfig { /** * Default constructor with individual refresh configuration properties * being set to their default values */ public RefreshConfig() { this(Collections.<String, String>emptyMap(), ""); } /** * Constructor based on a map with keys being the String keys associated with * {@link RefreshConfigProp} instances and values being either Strings or * non-Strings. Individual refresh configuration properties that are not * explicitly set to a valid value on the given map will be set to their default * value for this instance. * * @param configMap * the mandatory (but possibly empty) configuration map upon which to * build this instance * @see RefreshConfigProp#stringKeys() * @see RefreshConfigProp#parseValue(Object) */ public RefreshConfig(Map<String, String> configMap) { this(configMap, ""); } /** * Constructor based on a map with keys being the String keys associated with * {@link RefreshConfigProp} instances and values being either Strings or * non-Strings. Individual refresh configuration properties that are not * explicitly set to a valid value on the given map will be set to their default * value for this instance. * * @param configMap * the mandatory (but possibly empty) configuration map upon which to * build this instance * @param keyPrefix * the mandatory (but potentially blank) prefix to prepend to String * keys * @see RefreshConfigProp#stringKeys() * @see RefreshConfigProp#parseValue(Object) */ public RefreshConfig(Map<String, String> configMap, String keyPrefix) { // etc... } public Map<RefreshConfigProp, Object> refreshConfigMap() { return refreshConfigMaptoken; } public double refreshWindowFactor() {/** * Return the return refreshWindowFactor; } public double refreshWindowJitter() {(potentially null) error status value as per * return refreshWindowJitter;<a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set } public long refreshMinPeriodMillis() { * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a> * and the <a href= return refreshMinPeriodMillis; } * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA * OAuth Extensions Error Registry</a>. * * public@return booleanthe reloginAllowedBeforeLogout(potentially null) {error status value */ return reloginAllowedBeforeLogout; public String } errorStatus() { // etc... } | ||||||
| Code Block | ||||||
| ||||||
package org.apache.kafka.common.security.oauthbearer.refresh; import javax.security.auth.spi.LoginModule; return errorStatus; } /** * An extension* ofReturn the {@code LoginModule} interface that must be implemented by * any login module that generates an instance of {@link ExpiringCredential} to * be managed/refreshed by {@link ExpiringCredentialRefreshingLogin}. The * existence of this interface is necessary to deal with the case when there are * multiple enabled SASL mechanisms, one or more of which generate an expiring * credential, and a SASL mechanism (as opposed to authentication via SSL) is * used for inter-broker communication. * <p> * The following broker-side JAAS configuration helps illustrate the need for * this interface: * * <pre> * KafkaServer { * org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required * // etc...; * some.othersaslmechanism.OtherSaslMechanismLoginModule Optional * // etc...; * }; * </pre> * * The {@code LoginContext} instance will initialize both login modules and ask * them both to login -- that's how JAAS works -- regardless of which SASL * mechanism is used for inter-broker communication. It is imperative that the * login succeeds for the login module associated with the mechanism configured * for inter-broker communication; it doesn't matter if any other mechanisms * fail because they aren't actually being used for client-side work (they are * only being used for the server side of the SASL handshake, and that is * performed by the {@code SaslServer} instance rather than the * {@code LoginModule} instance). This has 2 implications: * <p> * <ol> * <li>The {@code CallbackHandler} instance provided to the {@code LoginContext} * instance (which then passes it to all of the login modules) must be the * correct one for the login module that must succeed; it does not have to be * the correct one for any others. * <li>The login modules that don't have to succeed (because they aren't being * used for inter-broker communication) should be marked {@code Optional} in * case they fail. * </ol> * <p> * This raises the critical issue of how any instance of {@link Login} can know * which {@code CallbackHandler} instance to instantiate. The * {@link ScramLoginModule} and {@link PlainLoginModule} don't use the callback * handler, but {@code com.sun.security.auth.module.Krb5LoginModule} does, and * in fact {@link LoginCallbackHandler} serves the purpose of short-circuiting * any request for user interaction in that case. So this issue hasn't been * critical in the past; it is only now becoming more important. * <p> * All the {@code Login} instance knows is the SASL mechanism enabled for * inter-broker communication (from the broker config) and the JAAS * configuration (for example, the one shown above). It cannot know which * {@code CallbackHandler} instance to instantiate from just that information * because it cannot determine from that information alone which of the login * modules handles the declared inter-broker communication mechanism. * <p> * We thus arrive at the need for this interface. A {@code Login} instance can * look at all of the declared login module classes, determine which of them * implements this interface, and then for each of those it can instantiate an * instance using the default constructor and ask for its applicable mechanisms * via {@link #mechanisms()}. If it finds a login module with an applicable SASL * mechanism matching the one being used for inter-broker communication it can * then ask for a {@code CallbackHandler} instance via * {@link #newCallbackHandler()}. Otherwise, if it doesn't find a match, it just * instantiates an instance of {@link LoginCallbackHandler}, which is what * AbstractLogin#login()} currently creates and is required for the initial * Kerberos login attempt for short-circuit behavior as mentioned above. */ public interface ExpiringCredentialLoginModule extends LoginModule { /** * Return the set of SASL mechanisms this login module applies to * * @return the set of SASL mechanisms this login module applies to */ Set<String> mechanisms(); /** * Return a new {@code CallbackHandler} instance appropriate for this login * module when one of its supported mechanisms as returned by * {@link #mechanisms()} is the SASL mechanism for inter-broker communication. * * @return a new {@code CallbackHandler} instance appropriate for this login * module when one of its supported mechanisms as returned by * {@link #mechanisms()} is the SASL mechanism for inter-broker * communication. */ CallbackHandler newCallbackHandler(); } | ||||||
| Code Block | ||||||
| language | java | title | ||||
(potentially null) error scope value as per
* <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
* of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
*
* @return the (potentially null) error scope value
*/
public String errorScope() {
return errorScope;
}
/**
* Return the (potentially null) error openid-configuration value as per
* <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
* of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
*
* @return the (potentially null) error openid-configuration value
*/
public String errorOpenIDConfiguration() {
return errorOpenIDConfiguration;
}
/**
* Set the token. The token value is unchanged and is expected to match the
* provided token's value. All error values are cleared.
*
* @param token
* the mandatory token to set
*/
public void token(OAuthBearerToken token) {
this.token = Objects.requireNonNull(token);
this.errorStatus = null;
this.errorScope = null;
this.errorOpenIDConfiguration = null;
}
/**
* Set the error values as per
* <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
* of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
* Any token is cleared.
*
* @param errorStatus
* the mandatory error status value from the <a href=
* "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
* OAuth Extensions Error Registry</a> to set
* @param errorScope
* the optional error scope value to set
* @param errorStatus
* the optional error openid-configuration value to set
*/
public void error(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
if (Objects.requireNonNull(errorStatus).isEmpty())
throw new IllegalArgumentException("error status must not be empty");
this.errorStatus = errorStatus;
this.errorScope = errorScope;
this.errorOpenIDConfiguration = errorOpenIDConfiguration;
this.token = null;
}
} |
Summary of Configuration
| Anchor | ||||
|---|---|---|---|---|
|
The following table summarizes the proposed configuration for Kafka's SASL/OAUTHBEARER implementation.
| Configuration | Example Value | Likelihood of Value Being Different from the Example Value | Notes |
|---|---|---|---|
| JAAS Login Module |
| Low | |
Non-Broker:
Broker:
|
| High | Default implementation creates an unsecured OAuth Bearer Token |
|
| High | Default implementation validates an unsecured OAuth Bearer Token |
| JAAS Module Options | Varied | High | Used to configure the above sasl.login and sasl.server callback handlers. |
producer/consumer/broker configs: sasl.login.refresh.* | Varied, though defaults are reasonable | Low |
Summary for Production Use
To use SASL/OAUTHBEARER in a production scenario it is necessary to write two separate callback handlers implementing org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:
- A login callback handler that can retrieve an OAuth 2 bearer token from the token endpoint and wrap that token as an instance of
org.apache.kafka.common.security.oauthbearer.OAuthBearerToken. It must attempt to do this when it is asked to handle an instance oforg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback. - A SASL Server callback handler that can validate an OAuth 2 bearer token compact serialization and convert that compact serialization to an instance of
org.apache.kafka.common.security.oauthbearer.
...
OAuthBearerToken. It must attempt to do this when it is asked to handle an instance oforg.apache.kafka.common.security.oauthbearer.
...
OAuth Bearer Tokens
Every OAuth Bearer Token is a string representing an authorization issued to the client. There are at least two types of OAuth Bearer Tokens: an opaque token and a JSON Web Token (JWT). Nothing can be said about the contents of an opaque token other than that the contents are only available via some third party (e.g. an authorization server). Every JWT contains both a JSON Object Signing and Encryption (JOSE) Header and a JWT Claims Set. There are two distinct types of JWTs: a JSON Web Signature (JWS) and a JSON Web Encryption (JWE). There is no intent to model a JWE with the initial implementation.
...
| language | java |
|---|---|
| title | org.apache.kafka.common.security.oauthbearer.OAuthBearerToken |
| collapse | true |
OAuthBearerValidatorCallback. Note that this may entail secure retrieval (and perhaps caching) of a public key along with digital signature verification if the token is a JSON Web Signature (JWS); validation of various token claims such as the 'iat' (Issued At), 'nbf' (Not Before), 'exp' (Expiration Time), 'aud' (Audience), and 'iss' (Issuer) claims if the token is a JSON Web Token (JWT); and/or decryption if the token is encrypted as a JSON Web Encryption (JWE).
See 84803841 for details on how to declare these two classes to Kafka.
It is likely that the implementations of the above two callback handlers will leverage an open source JOSE (Javascript Object Signing and Encryption) library. See https://jwt.io/ for a list of many of the available libraries.
OAuth 2 is a flexible framework that allows different installations to do things differently, so the principal name in Kafka could come from any claim in a JWT. Most of the time it would come from the 'sub' claim, but it could certainly come from another claim, or it could be only indirectly based on a claim value (maybe certain text would be trimmed or prefixed/suffixed). Because the OAuth 2 framework is flexible, we need to accommodate that flexibility – and the ability to plugin arbitrary implementations of the above two callback handler classes gives us the required flexibility. As an example, the SASL Server callback handler implementation could leverage an open source JOSE library to parse the compact serialization, retrieve the public key if it has not yet been retrieved, verify the digital signature, validate various token claims, and map the 'sub' claim to the OAuthBearerToken's principal name (which becomes the SASL authorization ID, which becomes the Kafka principal name). By writing the callback handler code we have complete flexibility to meet the requirements of any particular OAuth 2 installation.
The following references may be helpful:
...
| URL | Description |
|---|
...
| rfc6749 | RFC 6749: The OAuth 2.0 Authorization Framework |
| https://tools.ietf.org/html/ |
...
| rfc6750 | RFC 6750: The OAuth 2.0 Authorization Framework: Bearer Token Usage |
| https://tools.ietf.org/html/ |
...
| rfc7519 | RFC 7519: JSON Web Token (JWT) |
| https://tools.ietf.org/html/rfc7515 | RFC 7515: JSON Web Signature (JWS) |
| https://tools.ietf.org/html/rfc7516 | RFC 7516: JSON Web Encryption (JWE) |
| https://tools.ietf.org/html/rfc7628 | RFC 7628: A Set of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth |
Proposed Changes
Thanks to KIP-86: Configurable SASL callback handlers, no changes to existing public interfaces are required – all functionality represents additions rather than changes. The only changes to the existing implementation are to define appropriate default callback handler/login classes for SASL/OAUTHBEARER
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* A JSON Web Token, which may either be a JWS or a JWE. Neither signature
* validation nor decryption are performed here.
*
* @see <a href="https://tools.ietf.org/html/rfc7519">RFC 7519</a>
*/
public abstract class OAuthBearerJwt implements OAuthBearerToken {
/**
* Constructor with the principal taken from the '{@code sub}' claim (if any)
* and no scope claim name
*
* @param compactSerialization
* the compact serialization to parse as a JWT
* @throws OAuthBearerIllegalTokenException
* if compactSerialization cannot be either a JWS or JWE (due to
* there not being either 3 or 5 dot-separated values; or the JWT
* header either not being a valid Base64 URL-encoded value or not
* valid JSON after decoding)
*/
public OAuthBearerJwt(String compactSerialization) throws OAuthBearerIllegalTokenException {
this(compactSerialization, null);
}
/**
* Constructor with the given principal and scope claim names
*
* @param compactSerialization
* the compact serialization to parse as a JWT
* @param principalClaimName
* the optional principal claim name
* @param scopeClaimName
* the optional scope claim name
* @throws OAuthBearerIllegalTokenException
* if compactSerialization cannot be either a JWS or JWE (due to
* there not being either 3 or 5 dot-separated values; or the JWT
* header either not being a valid Base64 URL-encoded value or not
* valid JSON after decoding)
*/
public OAuthBearerJwt(String compactSerialization, String principalClaimName, String scopeClaimName)
throws OAuthBearerIllegalTokenException {
// etc...
}
/**
* Return the 3 or 5 dot-separated sections of the JWT compact serialization
*
* @return the 3 or 5 dot-separated sections of the JWT compact serialization
*/
public List<String> splits() {
return splits;
}
/**
* Return the JOSE Header as a {@code Map}
*
* @return the JOSE header
*/
public Map<String, Object> header() {
return header;
}
/**
* Return the JWT Claim Set as a {@code Map}
*
* @return the (always non-null but possibly empty) claims
*/
public abstract Map<String, Object> claims();
/**
* Return the scope claim name, if any, otherwise null
*
* @return the scope claim name, if any, otherwise null
*/
public String scopeClaimName() {
return scopeClaimName;
}
/**
* Indicate if the claim exists and is the given type
*
* @param claimName
* the mandatory JWT claim name
* @param type
* the mandatory type, which should either be String.class,
* Number.class, or List.class
* @return true if the claim exists and is the given type, otherwise false
*/
public boolean isClaimType(String claimName, Class<?> type) {
// etc...
}
/**
* Extract a claim of the given type
*
* @param claimName
* the mandatory JWT claim name
* @param type
* the mandatory type, which must either be String.class,
* Number.class, or List.class
* @return the claim if it exists, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim exists but is not the given type
*/
public <T> T getClaim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
// etc...
}
/**
* Extract a claim that could be either a String or a String List as a String
* List (if it was a String it will be returned as a list of size 1).
*
* @param claimName
* the mandatory JWT claim name
* @return the claim in the form of a String List, if it exists, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim exists but is neither a String nor a List
*/
public List<String> getListClaimFromStringOrList(String claimName) throws OAuthBearerIllegalTokenException {
// etc...
}
/**
* Extract a claim in its raw form
*
* @param claimName
* the mandatory JWT claim name
* @return the raw claim value, if it exists, otherwise null
*/
public Object getRawClaim(String claimName) {
return claims().get(Objects.requireNonNull(claimName));
}
/**
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
* claim as a String List
*
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
* claim as a String List if available, otherwise null. An Audience
* claim that is a String will be returned as a List of size 1.
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public List<String> audience() throws OAuthBearerIllegalTokenException {
return getListClaimFromStringOrList(JwtClaim.AUDIENCE.claimName());
}
/**
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
* Time</a> claim
*
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
* Time</a> claim if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public Number expirationTime() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.EXPIRATION_TIME.claimName(), Number.class);
}
/**
* Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
* At</a> claim
*
* @return the
* <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
* At</a> claim if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public Number issuedAt() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.ISSUED_AT.claimName(), Number.class);
}
/**
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
*
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
* if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public String issuer() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.ISSUER.claimName(), String.class);
}
/**
* Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
* ID</a> claim
*
* @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
* ID</a> claim if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public String jwtId() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.JWT_ID.claimName(), String.class);
}
/**
* Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
* Before</a> claim
*
* @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
* Before</a> claim if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public Number notBefore() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.NOT_BEFORE.claimName(), Number.class);
}
/**
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
*
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
* if available, otherwise null
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public String subject() throws OAuthBearerIllegalTokenException {
return getClaim(JwtClaim.SUBJECT.claimName(), String.class);
}
/**
* Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
* object, and return the map of member names to their values (each value being
* represented as either a String, a Number, or a List of Strings).
*
* @param split
* the value to decode and parse
* @return the map of JSON member names to their String, Number, or String List
* value
* @throws OAuthBearerIllegalTokenException
* if the given Base64URL-encoded value cannot be decoded or parsed
*/
public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
// etc...
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>. Note that
* digital signature validation is not performed.
*/
public class OAuthBearerJws extends OAuthBearerJwt {
/**
* Constructor with no scope claim name
*
* @param compactSerialization
* the compact serialization to parse as a JWT
* @throws OAuthBearerIllegalTokenException
* if the compact serialization is not a valid JWS (meaning it did
* not have 3 dot-separated Base64URL sections; or the header or
* claims either are not valid Base 64 URL encoded values or are not
* JSON after decoding; or the existence or absence of a digital
* signature is inconsistent with the mandatory 'alg' header value)
*/
public OAuthBearerJws(String compactSerialization) throws OAuthBearerIllegalTokenException {
this(compactSerialization, JwtClaim.SUBJECT.claimName(), null);
}
/**
* Constructor with the given principal and scope claim names
*
* @param compactSerialization
* the compact serialization to parse as a JWS
* @param principalClaimName
* the optional principal claim name
* @param scopeClaimName
* the optional scope claim name
* @throws OAuthBearerIllegalTokenException
* if compactSerialization cannot be either a JWS or JWE (due to
* there not being either 3 or 5 dot-separated values; or the JWT
* header either not being a valid Base64 URL-encoded value or not
* valid JSON after decoding)
*/
public OAuthBearerJws(String compactSerialization, String principalClaimName, String scopeClaimName)
throws OAuthBearerIllegalTokenException {
// etc...
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* JSON Web Signature and Encryption Header Parameters
*
* @see <a href="https://www.iana.org/assignments/jose/jose.xhtml">JSON Web
* Signature and Encryption Header Parameters</a>
*/
public enum JwtHeaderParameter {
ALGORITHM("alg"),
JWK_SET_URL("jku"),
JSON_WEB_KEY("jwk"),
KEY_ID("kid"),
X_509_URL("x5u"),
X_509_CERTIFICATE_CHAIN("x5c"),
X_509_CERTIFICATE_SHA1_THUMBPRINT("x5t"),
X_509_CERTIFICATE_SHA256_THUMBPRINT("x5t#S256"),
TYPE("typ"),
CONTENT_TYPE("cty"),
CRITICAL("crit"),
ENCRYPTION_ALGORITHM("enc"),
COMPRESSION_ALGORITHM("zip"),
ISSUER("iss"),
SUBJECT("sub"),
AUDIENCE("aud");
private String headerParameterName;
private JwtHeaderParameter(String headerParameterName) {
this.headerParameterName = headerParameterName;
}
String headerParameterName() {
return headerParameterName;
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* JSON Web Token Claims
*
* @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
* Claims</a>
*/
public enum JwtClaim {
ISSUER("iss"),
SUBJECT("sub"),
AUDIENCE("aud"),
EXPIRATION_TIME("exp"),
NOT_BEFORE("nbf"),
ISSUED_AT("iat"),
JWT_ID("jti");
private String claimName;
private JwtClaim(String claimName) {
this.claimName = claimName;
}
public String claimName() {
return claimName;
}
} |
Substitutable Module Options
The mechanics of token retrieval and token validation (both described later) will differ across OAuth deployments. For example, for token retrieval, each deployment will inject credentials to the token endpoint differently, and the parameters sent to the token endpoint may also differ. Token validation will also differ because OAuth supports multiple methods of validation. Configuration of the retrieval and validation mechanisms, which are done via JAAS configuration, must therefore be flexible. In particular, while we collaboratively use instances implementing javax.security.auth.Callback and javax.security.auth.CallbackHandler to retrieve information, we can't know in advance what information will be required in order to retrieve or validate a token; a username and password might – or might not – be required, for example, and retrieval and validation will likely require much more as well. We also don't know where this information will come from: a file? an environment variable? a database? Somewhere else? We need implementations of the Callback and CallbackHandler interfaces that are just as flexible as we need the JAAS configuration to be.
JAAS config options (each of which is an individual element of the space-separated ModuleOptions list), in combination with appropriate Callback and CallbackHandler implementations, will support arbitrarily complex retrieval and substitution inside the option value. The JAAS Configuration spec already supports system property substitution via the ${system.property} syntax; we will implement support for arbitrarily complex substitutions. For example, the following would support substitution of the contents of a file (which is a common way to store secrets, especially within containers) into the option value:
thePassword="$[file/doNotLog/notBlank/=/path/to/secrets/the_secret]"
There are several features here that deserve comment:
- The "
$[" and "]" delimiters are the signal to perform a substitution (we can't use "${" and "}" because that is already defined by the JAAS Configuration spec to mean system property substitution). Note that to perform substitution within a substitution we will also support "$[[" and "]]" as delimiters (e.g."prefix_$[[file/defaultValue=$[envVar=ENVIRON_VALUE]/=/the/path]]", where this nesting capability seems most likely to be used to specify default values as indicated). - Immediately inside the opening delimiter is the type of substitution followed by any constraints we wish to apply. In the above, we identify this as a file substitution and we indicate two constraints: the resulting value should never be logged (i.e. store it as an instance of
org.apache.kafka.common.config.types.Passwordinstead of as aString); and the contents must not be blank (meaning it must not be empty or only contain whitespace). It is an error if any of the specified constraints are violated. - Immediately after the type of substitution and any optional constraints is an equal sign ("
=") followed by the value (which in the case of the "file" type is interpreted as the filename to read); then ultimately the closing delimiter appears.
This scheme is flexible and powerful; it handles most cases, but it remains relatively easy to create and read. Importantly, the types of replacements can be expanded in the future without breaking compatibility.
How would we retrieve the above value? We define the following representation of the module options and their substitution state along with associated Callback and CallbackHandler implementations:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* Holds state regarding which {@code LoginModule} <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a> have been substituted. Instances of this class are thread-safe.
*
* @see SubstitutableModuleOptionsCallbackHandler
* @see SubstitutableModuleOptionsCallback
*/
public class SubstitutableModuleOptions {
/**
* Constructor
*
* @param moduleOptionsMap
* the mandatory map representation of the <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a>
*/
public SubstitutableModuleOptions(Map<String, String> moduleOptionsMap) {
this.moduleOptionsMap = Collections.unmodifiableMap(Objects.requireNonNull(moduleOptionsMap));
}
/**
* Return (an unmodifiable copy of) the original module options map provided
* during construction
*
* @return (an unmodifiable copy of) the original module options map provided
* during construction
*/
public Map<String, String> moduleOptionsMap() {
return moduleOptionsMap;
}
/**
* Return an unmodifiable map identifying which module options have been
* processed for substitution and the resulting value after substitution (if
* any) was applied. A module option is guaranteed to have been processed for
* substitution and its name will appear as a key in the returned map only after
* {@link #setSubstitutionValue(String, String)} or
* {@link #setSubstitutionValue(String, Password)} is called.
*
* @return an unmodifiable map identifying which module options have been
* processed for substitution and the resulting value after substitution
* (if any) was applied
*/
public Map<String, Object> moduleOptionSubstitutionState() {
return Collections.unmodifiableMap(moduleOptionSubstitutionState);
}
/**
* Identify that the option with the given name has had substitution performed
* for it resulting in the given non-password value. This method can be
* successfully invoked (and is idempotent) only if invoked before
* {@link #setSubstitutionValue(String, Password)}; the option value cannot be
* changed.
*
* @param optionName
* the mandatory option name, which must exist in the map returned by
* {@link #moduleOptionsMap()}
* @param substitutionValue
* the mandatory substitution value to set
*/
public void setSubstitutionValue(String optionName, String substitutionValue) {
setSubstitutionValueInternal(Objects.requireNonNull(optionName), Objects.requireNonNull(substitutionValue));
}
/**
* Identify that the option with the given name has had substitution performed
* for it resulting in the given password-related value. This method can be
* successfully invoked (and is idempotent) only if invoked before
* {@link #setSubstitutionValue(String, String)}; the option value cannot be
* changed.
*
* @param optionName
* the mandatory option name, which must exist in the map returned by
* {@link #moduleOptionsMap()}
* @param substitutionValue
* the mandatory substitution value to set
*/
public void setSubstitutionValue(String optionName, Password substitutionValue) {
setSubstitutionValueInternal(Objects.requireNonNull(optionName), Objects.requireNonNull(substitutionValue));
}
// etc..
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code Callback} related to introspection requests against a JAAS
* configuration
*
* @see SubstitutableModuleOptionsCallbackHandler
* @see SubstitutableModuleOptions
*/
public class SubstitutableModuleOptionsCallback implements Callback {
/**
* Constructor
*
* @param substitutableModuleOptions
* the mandatory substitutable module options
* @param optionName
* the requested option
* @param optionRequiredToExist
* if true then the requested option is required to exist
*/
public SubstitutableModuleOptionsCallback(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
boolean optionRequiredToExist) {
// etc...
}
/**
* Return the substitutable module options provided at construction time
*
* @return the substitutable module options provided at construction time
*/
public SubstitutableModuleOptions substitutableModuleOptions() {
return substitutableModuleOptions;
}
/**
* Return the option name provided at construction time
*
* @return the option name provided at construction time
*/
public String optionName() {
return optionName;
}
/**
* Return true if the the requested option is required to exist, otherwise false
*
* @return true if the the requested option is required to exist, otherwise
* false
*/
public boolean optionRequiredToExist() {
return optionRequiredToExist;
}
/**
* Identify that the option identified by {@link #optionName()}, on the instance
* returned by {@link #substitutableModuleOptions()}, has had substitution
* performed for it resulting in the given non-password value. This method can
* be successfully invoked (and is idempotent) only when the option exists and
* its value isn't being changed.
*
* @param substitutionValue
* the mandatory non-password substitution value to set
*/
public void setSubstitutionValue(String substitutionValue) {
substitutableModuleOptions.setSubstitutionValue(optionName, Objects.requireNonNull(substitutionValue));
}
/**
* Identify that the option identified by {@link #optionName()}, on the instance
* returned by {@link #substitutableModuleOptions()}, has had substitution
* performed for it resulting in the given password-related value. This method
* can be successfully invoked (and is idempotent) only when the option exists
* and its value isn't being changed.
*
* @param substitutionValue
* the mandatory password-related substitution value to set
*/
public void setSubstitutionValue(Password substitutionValue) {
substitutableModuleOptions.setSubstitutionValue(optionName, Objects.requireNonNull(substitutionValue));
}
/**
* Return the substitution value, if any has been set, otherwise null. Any
* non-null value will be either a {@code String} or a {@code Password}. Note
* that the value may not have been set via a call to
* {@link #setSubstitutionValue(Password)} or
* {@link #setSubstitutionValue(String)}; it is possible that the value was
* already set prior to construction of this instance, in which case that value
* will be returned here.
*
* @return the substitution value, if any has been set, otherwise null
*/
public Object substitutionValue() {
return substitutableModuleOptions.moduleOptionSubstitutionState().get(optionName);
}
/**
* Return the substitution text, if any has been set, otherwise null.
*
* @return the substitution value, if any has been set, otherwise null.
*/
public String substitutionText() {
Object substitutionValue = substitutionValue();
return substitutionValue instanceof Password ? ((Password) substitutionValue).value()
: (String) substitutionValue;
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code CallbackHandler} that handles introspection requests against a JAAS
* configuration
*
* @see SubstitutableModuleOptionsCallback
* @see SubstitutableModuleOptions
*/
public class SubstitutableModuleOptionsCallbackHandler implements AuthenticateCallbackHandler {
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof SubstitutableModuleOptionsCallback) {
SubstitutableModuleOptionsCallback substitutableModuleOptionsCallback = (SubstitutableModuleOptionsCallback) callback;
SubstitutableModuleOptions substitutableModuleOptions = substitutableModuleOptionsCallback
.substitutableModuleOptions();
String optionName = substitutableModuleOptionsCallback.optionName();
boolean optionRequiredToExist = substitutableModuleOptionsCallback.optionRequiredToExist();
boolean optionExists = substitutableModuleOptions.moduleOptionsMap().containsKey(optionName);
if (!optionExists) {
if (!optionRequiredToExist)
continue;
else
throw new IOException(String
.format("The requested option was required to exist but does not: %s", optionName));
}
Object substitutionValue = getSubstitutionValue(substitutableModuleOptions, optionName,
new HashSet<String>());
if (substitutionValue instanceof String)
substitutableModuleOptionsCallback.setSubstitutionValue((String) substitutionValue);
else
substitutableModuleOptionsCallback.setSubstitutionValue((Password) substitutionValue);
} else
throw new UnsupportedCallbackException(callback,
String.format("Unrecognized Callback type: %s", callback.getClass().getName()));
}
}
/**
* Return the server configuration provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the server configuration provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public Map<String, ?> serverConfig() {
return serverConfig;
}
/**
* Return the SASL mechanism provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the SASL mechanism provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public String mechanism() {
return mechanism;
}
/**
* Return the JAAS login module configurations provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the JAAS login module configurations provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public List<AppConfigurationEntry> jaasConfigEntries() {
return jaasConfigEntries;
}
/**
* Convenience method to perform substitution without using callbacks and a
* callback handler.
*
* @param substitutableModuleOptions
* the mandatory substitutable module options to query
* @param optionName
* the requested option
* @param optionRequiredToExist
* if true, and the requested option does not exist, then an
* exception is raised; if false, and the requested option does not
* exist, then the empty string is returned.
* @return the given option value, after any required substitution is applied,
* or the empty string if the option does not exist and it was not
* required to exist
* @throws IOException
* if a required substitution cannot be performed (including if a
* required option does not exist)
*/
public static Object getSubstitutionValue(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
boolean optionRequiredToExist) throws IOException {
boolean optionExists = substitutableModuleOptions.moduleOptionsMap().containsKey(optionName);
if (!optionExists) {
if (!optionRequiredToExist)
return "";
else
throw new IOException(
String.format("The requested option was required to exist but does not: %s", optionName));
}
return getSubstitutionValue(substitutableModuleOptions, optionName, new HashSet<String>());
}
/*
* Handle substitution, dealing with circular references, constraints, etc.
*/
private static Object getSubstitutionValue(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
HashSet<String> optionsSeen) throws IOException {
// etc...
}
// etc...
} |
Given the above code, and assuming the existence of the following mapping in the module options map:
thePassword="$[file/doNotLog/notBlank/=/path/to/secrets/the_secret]"we can retrieve the contents of the file as follows:
| Code Block | ||||
|---|---|---|---|---|
| ||||
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
SubstitutableModuleOptionsCallback callback = new SubstitutableModuleOptionsCallback(options, "thePassword");
CallbackHandler callbackHandler = new SubstitutableModuleOptionsCallbackHandler()
callbackHandler.handle(new Callback[] {callback});
String thePassword = callback.substitutionText(); |
The initial set of supported substitution types and their specifiable constraints are as follows:
| Type | Description | Specifiable Constraints | Notes |
|---|---|---|---|
| file | File content substitution | notBlank, notEmpty, optionalExist, doNotLog, defaultValue | It is an error if the file does not exist or is not readable unless optionalExist is specified. If a defaultValue is specified then optionalExist is implied, and the default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the filename or the default value depend on substitutions that were marked doNotLog then doNotLog is implied. |
| envvar | Environment variable substitution | notBlank, notEmpty, optionalExist, doNotLog, defaultValue | It is an error if the environment variable does not exist unless optionalExist is specified. If a defaultValue is specified then optionalExist is implied, and the default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the environment variable name or the default value depend on substitutions that were marked doNotLog then doNotLog is implied. |
| option | Another module option value substitution | notBlank, notEmpty, defaultValue | It is always an error if the specified option does not exist. The default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the option value (whether determined via a default value or not) depends on substitutions that were marked doNotLog then doNotLog is implied. |
Note that the above substitution functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this substitution-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.
Token Retrieval
Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval. We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever class as a sample implementation that also provides value in development and testing situations.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientProvider;
import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServerProvider;
/**
* A {@code LoginModule} for the SASL/OAUTHBEARER mechanism.
* <p>
* Example use on a non-broker client:
*
* <pre>
* KafkaClient {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* reloginAllowedBeforeLogout="true"
* tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
* stringClaim_sub="theClientPrincipalName"
* listClaim_scope="|scopeValue1|scopeValue2";
* };
* </pre>
*
* Example use on a broker:
*
* <pre>
* KafkaServer {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* reloginAllowedBeforeLogout="true"
* tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
* stringClaim_sub="theBrokerPrincipalName"
* listClaim_scope="|scopeValue1|scopeValue2"
* tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
* principalClaimName="sub";
* };
* </pre>
*
* @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
* @see OAuthBearerTokenRetriever
* @see OAuthBearerTokenValidator
*/
public class OAuthBearerLoginModule implements ExpiringCredentialLoginModule {
public static final String TOKEN_RETRIEVER_CLASS_NAME_OPTION = "tokenRetriever";
public static final String TOKEN_VALIDATOR_CLASS_NAME_OPTION = "tokenValidator";
private static final Set<String> MECHANISMS = Collections
.unmodifiableSet(new HashSet<>(Arrays.asList("OAUTHBEARER")));
static {
OAuthBearerSaslClientProvider.initialize(); // not part of public API
OAuthBearerSaslServerProvider.initialize(); // not part of public API
}
@Override
public Set<String> mechanisms() {
return MECHANISMS;
}
@Override
public CallbackHandler newCallbackHandler() {
return new SubstitutableModuleOptionsCallbackHandler();
}
// etc...
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* An implementation of this interface must be configured for broker and
* non-broker clients via JAAS when using the {@link OAuthBearerLoginModule}.
* The configuration is done via the
* {@value OAuthBearerLoginModule#TOKEN_RETRIEVER_CLASS_NAME_OPTION} option.
*/
public interface OAuthBearerTokenRetriever {
/**
* Retrieve a token using the given callback handler and JAAS login module
* options.
*
* @param callbackHandler
* the mandatory callback handler. It will typically be capable of
* handling instances of {@link SubstitutableModuleOptionsCallback},
* though different implementations of this interface are free to
* differ in their requirements.
* @param moduleOptionsMap
* the mandatory map representation of the <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a>
* @return the result of the attempt. It will be an instance of
* {@link OAuthBearerToken} if the token is successfully retrieved;
* otherwise it will be a String describing why the token could not be
* retrieved (which will be a different reason than an unavailable
* remote resource or a configuration problem -- these issues result in
* exceptions as described below).
* @throws IOException
* if one or more networked resources required to perform the
* retrieval (e.g. a web service) is unavailable.
* @throws UnsupportedCallbackException
* if the provided {@code CallbackHandler} cannot handle
* {@link SubstitutableModuleOptionsCallback}.
* @throws OAuthBearerConfigException
* if there is a configuration problem that prevents this instance
* from functioning (a missing mandatory parameter, for example)
* @throws LoginException
* if the retrieval fails for any other reason (if the token
* endpoint rejects the provided credentials, for example)
*/
OAuthBearerToken retrieve(CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
throws IOException, UnsupportedCallbackException, OAuthBearerConfigException, LoginException;
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* An implementation of {@link OAuthBearerTokenRetriever} that generates an
* unsecured JWT. Claims and their values can be specified using
* stringClaim_<claimname>, numberClaim_<claimname>, and
* listClaim_<claimname> options. The first character of the value is
* taken as the delimiter for list claims. You may define any claim name and
* value except '{@code iat}' and '{@code exp}', both of which are calculated
* automatically.
* <p>
* This implementation also accepts the following options:
* <ul>
* <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a custom claim
* name if you wish the name of the String claim holding the principal name to
* be something other than <code>"sub"</code>.</li>
* <li><code>{@value #LIFETIME_SECONDS_OPTION}</code> set to an integer value if
* the token expiration is to be set to something other than the default value
* of 3600 seconds (which is 1 hour). The 'exp' claim reflects the expiration
* time.</li>
* <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
* if you wish the name of the String or String List claim holding any token
* scope to be something other than "<code>scope</code>"</li>
* </ul>
* For example:
*
* <pre>
* KafkaClient {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
* reloginAllowedBeforeLogout="true"
* stringClaim_sub="thePrincipalName"
* listClaim_scope="|scopeValue1|scopeValue2"
* lifetimeSeconds="60";
* };
*
* </pre>
*/
public class OAuthBearerUnsecuredJwtRetriever implements OAuthBearerTokenRetriever {
public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
public static final String LIFETIME_SECONDS_OPTION = "lifetimeSeconds";
public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
// etc...
} |
Token Validation
Token validation occurs on the broker side of the SASL negotiation, and the SaslServer implementation registered by org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator to perform the validation. We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator class as a sample implementation that also provides value in development and testing situations. There are additional utility classes as shown in the above diagram that are also reusable with other implementations.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
import javax.security.sasl.SaslServer;
/**
* The public interface of SaslServer implementations of SASL/OAUTHBEARER for
* Kafka. The validated access token will be available via the {@link #token()}
* method. The existence of this interface keeps the actual implementation out
* of the public API.
*/
public interface OAuthBearerSaslServer extends SaslServer {
/**
* Return the validated access token. Exposing the token here is necessary if we
* wish to make it available for authorization decisions.
*
* @return the validated access token
* @see <a href=
* "https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL">KIP-189:
* Improve principal builder interface and add support for SASL</a>
*/
public OAuthBearerToken token();
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* An implementation of this interface must be configured for brokers via JAAS
* when using the {@link OAuthBearerLoginModule}. The configuration is done via
* the {@value OAuthBearerLoginModule#TOKEN_VALIDATOR_CLASS_NAME_OPTION} option.
*/
public interface OAuthBearerTokenValidator {
/**
* Validate a token using the given callback handler and JAAS login module
* options.
*
* @param tokenValue
* the <code>b64token</code> value as defined in
* <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
* Section 2.1</a> to validate
* @param callbackHandler
* the mandatory callback handler. It will typically be capable of
* handling instances of {@link SubstitutableModuleOptionsCallback},
* though different implementations of this interface are free to
* differ in their requirements. Set the callback handler via the
* {@code sasl.server.callback.handler.class.map} option in the
* broker properties file.
* @param moduleOptionsMap
* the mandatory map representation of the <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a>
* @return the result of the attempt. It will be an instance of
* {@link OAuthBearerToken} if the token is successfully validated;
* otherwise it will be an instance of
* {@link OAuthBearerValidationResult} describing why the token could
* not be validated (which will be a different reason than an
* unavailable remote resource or a configuration problem -- these
* issues result in exceptions as described below). Additional keys and
* values are ignored by {@link OAuthBearerSaslServerImpl}.
* @throws IOException
* if one or more networked resources required to perform the
* validation (e.g. a web service) is unavailable.
* @throws UnsupportedCallbackException
* if the provided {@code CallbackHandler} cannot handle
* {@link SubstitutableModuleOptionsCallback}.
* @throws OAuthBearerConfigException
* if there is a configuration problem that prevents this instance
* from functioning (a missing mandatory parameter, for example)
* @throws OAuthBearerIllegalTokenException
* if there is a problem with the token itself (it cannot be parsed
* or it otherwise fails validation)
* @see BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS_MAP_DOC
*/
OAuthBearerToken validate(String tokenValue, CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
throws IOException, UnsupportedCallbackException, OAuthBearerConfigException,
OAuthBearerIllegalTokenException;
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* An implementation of {@link OAuthBearerTokenValidator} that validates a JWS
* without regard to its algorithm and signature. It requires there to be an
* <code>"exp" (Expiration Time)</code> claim of type Number. If
* <code>"iat" (Issued At)</code> or <code>"nbf" (Not Before)</code> claims are
* present each must be a number that precedes the Expiration Time claim, and if
* both are present the Not Before claim must not precede the Issued At claim.
* It also accepts the following options, none of which are required:
* <ul>
* <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a non-empty
* value if you wish a String claim holding a principal name to be checked for
* existence; the default is to perform no such check</li>
* <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
* if you wish the name of the String/String List claim holding any token scope
* to be something other than <code>"scope"</code></li>
* <li><code>{@value #REQUIRED_SCOPE_OPTION}</code> set to a space-delimited
* list of scope values if you wish the String/String List claim holding the
* token scope to be checked to make sure it contains certain values</li>
* <li><code>{@value #ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION}</code> set to a
* positive integer value if you wish to allow up to some number of positive
* milliseconds of clock skew (the default is 0)</li>
* </ul>
* For example:
*
* <pre>
* KafkaServer {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* reloginAllowedBeforeLogout="true"
* tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
* stringClaim_sub="thePrincipalName"
* listClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
* tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
* principalClaimName="sub"
* requiredScope="LOGIN_TO_KAFKA"
* allowableClockSkewMillis="3000";
* };
* </pre>
*/
public class OAuthBearerUnsecuredJwtValidator implements OAuthBearerTokenValidator {
public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
public static final String REQUIRED_SCOPE_OPTION = "requiredScope";
public static final String ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION = "allowableClockSkewMillis";
// etc...
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
public class OAuthBearerValidationUtils {
/**
* Validate the given claim for existence and type. It can be required to exist
* in the given claims, and if it exists it must be one of the types indicated
*
* @param jwt
* the mandatory JWT to which the validation will be applied
* @param required
* true if the claim is required to exist
* @param claimName
* the required claim name identifying the clam to be checked
* @param allowedTypes
* one or more of {@code String.class}, {@code Number.class}, and
* {@code List.class} identifying the type(s) that the claim value is
* allowed to be if it exists
* @return the result of the validation
*/
public static OAuthBearerValidationResult validateClaimForExistenceAndType(OAuthBearerJwt jwt, boolean required,
String claimName, Class<?>... allowedTypes) {
// etc...
}
/**
* Validate the 'iat' (Issued At) claim. It can be required to exist in the
* given claims, and if it exists it must be a (potentially fractional) number
* of seconds since the epoch defining when the JWT was issued; it is a
* validation error if the Issued At time is after the time at which the check
* is being done (plus any allowable clock skew).
*
* @param jwt
* the mandatory JWT to which the validation will be applied
* @param required
* true if the claim is required to exist
* @param whenCheckTimeMillis
* the time relative to which the validation is to occur
* @param allowableClockSkewMillis
* non-negative number to take into account some potential clock skew
* @return the result of the validation
* @throws OAuthBearerConfigException
* if the given allowable clock skew is negative
*/
public static OAuthBearerValidationResult validateIssuedAt(OAuthBearerJwt jwt, boolean required,
long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
// etc...
}
/**
* Validate the 'nbf' (Not Before) claim. It can be required to exist in the
* given claims, and if it exists it must be a (potentially fractional) number
* of seconds since the epoch defining the point at which the JWT becomes valid.
* It is a validation error if the time at which the check is being done (plus
* any allowable clock skew) is before the Not Before time.
*
* @param jwt
* the mandatory JWT to which the validation will be applied
* @param required
* true if the claim is required to exist
* @param whenCheckTimeMillis
* the time relative to which the validation is to occur
* @param allowableClockSkewMillis
* non-negative number to take into account some potential clock skew
* @return the result of the validation
* @throws OAuthBearerConfigException
* if the given allowable clock skew is negative
*/
public static OAuthBearerValidationResult validateNotBefore(OAuthBearerJwt jwt, boolean required,
long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
// etc...
}
/**
* Validate the 'exp' (Expiration Time) claim. It can be required to exist in
* the given claims, and if it exists it must be a (potentially fractional)
* number of seconds defining the point at which the JWT expires. It is a
* validation error if the time at which the check is being done (minus any
* allowable clock skew) is on or after the Expiration Time time.
*
* @param jwt
* the mandatory JWT to which the validation will be applied
* @param required
* true if the claim is required to exist
* @param whenCheckTimeMillis
* the time relative to which the validation is to occur
* @param allowableClockSkewMillis
* non-negative number to take into account some potential clock skew
* @return the result of the validation
* @throws OAuthBearerConfigException
* if the given allowable clock skew is negative
*/
public static OAuthBearerValidationResult validateExpirationTime(OAuthBearerJwt jwt, boolean required,
long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
// etc...
}
/**
* Validate the 'iat' (Issued At), 'nbf' (Not Before), and 'exp' (Expiration
* Time) claims for internal consistency. For claim pairs that exist, the
* following must be true:
* <ul>
* <li>nbf >= iat</li>
* <li>exp > iat</li>
* <li>exp > nbf</li>
* </ul>
*
* @param jwt
* the mandatory JWT to which the validation will be applied
* @return the result of the validation
*/
public static OAuthBearerValidationResult validateTimeConsistency(OAuthBearerJwt jwt) {
// etc...
}
/**
* Validate the given scope against the required scope. Every required scope
* element (if any) must exist in the provided scope for the validation to
* succeed.
*
* @param scope
* the optional scope to validate
* @param requiredScope
* the optional required scope against which the given scope will be
* validated
* @return the result of the validation
* @throws IOException
* if one or more required networked resources (e.g. to re-hydrate
* an opaque token) is unavailable.
* @throws OAuthBearerIllegalTokenException
* if there is something fundamentally wrong with the token (if it
* is malformed, for example)
*/
public static OAuthBearerValidationResult validateScope(OAuthBearerToken token, List<String> requiredScope)
throws IOException, OAuthBearerIllegalTokenException {
// etc...
}
// etc...
private OAuthBearerValidationUtils() {
// empty
}
} |
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer;
/**
* Utility class for help dealing with
* <a href="https://tools.ietf.org/html/rfc6749#section-3.3">Access Token
* Scopes</a>
*/
public class OAuthBearerScopeUtils {
private static final Pattern INDIVIDUAL_SCOPE_ITEM_PATTERN = Pattern.compile("[\\x23-\\x5B\\x5D-\\x7E\\x21]+");
/**
* Return true if the given value meets the definition of a valid scope item as
* per <a href="https://tools.ietf.org/html/rfc6749#section-3.3">RFC 6749
* Section 3.3</a>, otherwise false
*
* @param scopeItem
* the mandatory scope item to check for validity
* @return true if the given value meets the definition of a valid scope item,
* otherwise false
*/
public static boolean isValidScopeItem(String scopeItem) {
return INDIVIDUAL_SCOPE_ITEM_PATTERN.matcher(Objects.requireNonNull(scopeItem)).matches();
}
/**
* Convert a space-delimited list of scope values (for example,
* <code>"scope1 scope2"</code>) to a List containing the individual elements
* (<code>"scope1"</code> and <code>"scope2"</code>)
*
* @param spaceDelimitedScope
* the mandatory (but possibly empty) space-delimited scope values,
* each of which must be valid according to
* {@link #isValidScopeItem(String)}
* @return the list of the given (possibly empty) space-delimited values
* @throws OAuthBearerConfigException
* if any of the individual scope values are malformed/illegal
*/
public static List<String> parseScope(String spaceDelimitedScope) throws OAuthBearerConfigException {
// etc...
}
private OAuthBearerScopeUtils() {
// empty
}
} |
Proposed Changes
Thanks to KIP-86, no changes to the existing code base are required – all functionality represents additions (rather than changes) to the existing code base.
Compatibility, Deprecation, and Migration Plan
...
There is no change to existing behavior
Rejected Alternatives
...
Rejected Alternatives
Motivation
| Anchor | ||||
|---|---|---|---|---|
|
The idea of providing a toolkit of reusable JWT/JWS/JWE retrieval and validation functionality was discarded in favor of a single, simple unsecured JWS implementation. Anything beyond simple unsecured use cases requires significant functionality that is available via multiple open source libraries, and there is no need to duplicate that functionality here. It is also not desirable for the Kafka project to define a specific open source JWT/JWS/JWE library dependency; better to allow installations to use the library that they feel most comfortable with. This decision also allows the public-facing interface of this KIP to be considerably smaller than it would otherwise be.
Substitution Within Configuration Values
| Anchor | ||
|---|---|---|
|
...
|
...
<<<NOTE: Please consider this to be open for debate at this point>>>
Unchecked exceptions can be added or deleted without breaking binary compatibility, so it would seem that OAuthBearerException should be unchecked and we should forego the compiler help provided by the use of checked exceptions. However, the set of exception types and the size of the overall code base are both small, and the risk that we would want to change the type(s) of exception(s) thrown is minimal.
...
<<<NOTE: Please consider this to be open for debate at this point>>>
...
|
Flexible, substitution-aware configuration was originally proposed in an early draft of this KIP but was separated out into its own KIP-269 Substitution Within Configuration Values because it was thought that such functionality might be more broadly useful. That KIP did not gain traction in part because it was pointed out that most secret-related configuration could be done dynamically via the functionality provided by KIP-226 Dynamic Broker Configuration, which can help remove secrets from the configuration files themselves and is based on an event notification paradigm as opposed to the passive, pull-based one used by substitution. It was also pointed out that production implementations of SASL/OAUTHBEARER for Kafka as defined here would have to provide their own AuthenticateCallbackHandler implementations (as per KIP-86: Configurable SASL callback handlers), and those implementation can be as flexible as desired/required with respect to how they deal with secrets. In the end, this KIP has no dependency on substitution, and that KIP can live/die/resurrect – as the community sees fit – independently of this one.
Explicit Configuration of Token Refresh Class
| Anchor | ||||
|---|---|---|---|---|
|
We adjusted the logic in SaslChannelBuilder to automatically configure the org.apache.kafka.common.security.oauthbearer
...
.internal.OAuthBearerRefreshingLogin class as the Login implementation by default for SASL/OAUTHBEARER use cases. We also decided not to unify the refresh logic required for both SASL/OAUTHBEARER and SASL/GSSAPI mechanisms as part of this KIP, though this unification could occur at some point in the future. The above-mentioned OAuthBearerRefreshingLogin class (which is NOT part of the public API) delegates to an underlying imlementation in the same internal package, and this delegation-based approach suggests a potential way forward with regard to unification, but unification is explicitly out of scope here. The chosen delegation design along with automatic configuration allows several classes to be moved to the internal package, which helps to minimize the public-facing API for this KIP and creates a better out-of-the-box experience.
Callback Handlers and Callbacks
| Anchor | ||||
|---|---|---|---|---|
|
We decided to leverage the standard JAAS Callback/CallbackHandler mechanism for communicating information between various components (specifically, between the SASL Client and the Login Module, between the Login Module and the Login/Token Retrieval mechanism, and between the SASL Server and the Token Validation mechanism). We had originally documented the need for just the last two (token retrieval and token validation), and the original proposal was to declare the class names as part of the JAAS configuration. The only real benefit to doing this was that the declaration of the classes and their configuration were co-located in the JAAS configuration. We decided the benefit of consistency was at least as valuable as any cost associated with separating the declaration from the configuration – and probably more valuable given that this is how configuration is done for other SASL mechanisms and is therefore not unfamiliar. We also now leverage callback handlers in all three places where it is supported, though one of them (SASL Client callback handler) has no need for explicit configuration in any known use case. Finally, we adjusted the logic in SaslChannelBuilder and LoginManager to automatically apply default AuthenticateCallbackHandler and Login implementations for SASL/OAUTHBEARER in the absence of explicit declarations, which simplifies the out-of-the-box experience for unsecured (i.e. development and testing) use cases.
We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration. There was problem with this aproach, though: we can't invoke login() on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate – and there is no way to know that. It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler instance; hence we created the ExpiringCredentialLoginModule interface.
...







