You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 74 Next »

Status

Current state: <<<DRAFT>>> (not yet "Under Discussion")

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The ability to authenticate to Kafka with an OAuth 2 Access Token is desirable given the popularity of OAuth.  The introduction of KIP-86 (Configurable SASL callback handlers) makes it possible to non-intrusively add new SASL mechanisms, and OAUTHBEARER is the SASL mechanism for OAuth.  OAuth 2 is a flexible framework with multiple ways of doing the same thing (for example, getting a public key to confirm a digital signature), so pluggable implementations – and flexibility in their configuration – is a requirement.

This KIP proposes to add the following functionality related to SASL/OAUTHBEARER:

  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server according to JAAS-defined configuration (and potentially plugged-in implementation) and present the access token to a broker for authentication.
  • Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on JAAS-defined configuration (and potentially plugged-in implementation).
  • Provide a toolkit of reusable functionality applicable to common token retrieval and validation scenarios so that implementations can either be reused or, when they are custom and plugged-in, be relatively small.
  • Allow clients to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.  Any existing connections will always remain unaffected by this "token refresh" functionality so long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any) – this is how Kerberos-authenticated connections work with respect to ticket expiration.

Note that the access token can be made available to the broker for authorization decisions due to KIP-189 (by exposing the access token on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal.  It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens.  For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection so long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be using an expired token.  This would have to be addressed via a separate KIP if it turns out to be problematic, but one is reminded of how code signing certificates that have been timestamped remain valid after their expiration.

 

Public Interfaces

Exceptions

See Rejected Alternatives: Exceptions

Exceptions

org.apache.kafka.common.security.oauthbearer.OAuthBearerException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Base class for all exceptions thrown by the SASL/OAUTHBEARER code. Unlike the
 * unchecked {@link KafkaException} hierarchy, this hierarchy can be checked to
 * obtain compiler help because instances are never exposed to the core Kafka
 * code base and there is minimal risk of changing the set of thrown checked
 * exceptions and impacting contract/source code compatibility given the small
 * size of this code base.
 */
public abstract class OAuthBearerException extends Exception {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerConfigException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with the configuration (an invalid
 * option in a JAAS config, for example).
 */
public class OAuthBearerConfigException extends OAuthBearerException {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerIllegalTokenException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown due to an illegal token (a supposed JWS or JWE compact
 * serialization that is grossly malformed or that yields JSON that cannot be
 * parsed, etc.)
 */
 
public class OAuthBearerIllegalTokenException extends OAuthBearerException {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJwtClaimException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with a JWT claim (a required claim
 * not being present, or a claim with an unexpected type, for example).
 */
 
public class OAuthBearerJwtClaimException extends OAuthBearerException {
    // etc...
}

Token Refresh

See Rejected Alternatives: Token Refresh

Token Refresh

We define the org.apache.kafka.common.security.oauthbearer.ExpiringCredential and org.apache.kafka.common.security.oauthbearer.ExpiringCredentialLoginModule interfaces and the org.apache.kafka.common.security.oauthbearer.ExpiringCredentialRefreshingLogin class that can refresh expiring credentials.  We represent many of the parameters that impact how the refresh algorithm operates as a single map of enum/value pairs rather than as separate methods – this helps ensure forward compatibility if/when we decide we want to add more potential parameters (having the flexibility to add more parameters without adding methods to an interface is especially important since Java 7 does not support default methods on interfaces – adding a new method breaks forward compatibnility).  We provide the org.apache.kafka.common.security.oauthbearer.RefreshConfig class to provide the methods that we did not add to any interface, and the org.apache.kafka.common.security.oauthbearer.ExpiringCredentialRefreshingLogin class will parse the properties from the JAAS config options (with appropriate defaults in case they are not explicitly specified).

Note that the token refresh functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this refresh-related code belongs in the same package as the SASL/OAUTHBEARER code or if it should live separately in its own package.  If it ends up in its own package then perpaps RefreshConfigProp.parseValue(Object) should not throw an OAuthBearerConfigException but rather a different exception type.

org.apache.kafka.common.security.oauthbearer.ExpiringCredential
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * A credential that expires and that can potentially be refreshed
 *
 * @see ExpiringCredentialRefreshingLogin
 */
public interface ExpiringCredential {
    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();
    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known otherwise null
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known otherwise null
     */
    Long startTimeMillis();
 
    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch
     *
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMillis();
 
    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null.
     *
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshMillis();
}
org.apache.kafka.common.security.oauthbearer.RefreshConfigProp
package org.apache.kafka.common.security.oauthbearer;

/**
 * Individual refresh-related configuration properties defining how
 * {@link ExpiringCredentialRefreshingLogin} refreshes instances of
 * {@link ExpiringCredential}. Each value has a type, optional min/max/default
 * values, an ordered list of string keys (containing the enum name and the
 * camelCase version of the name, in that order) so it can be picked out of a
 * map keyed with Strings, and the ability to parse a string or non-string value
 * associated with one of its string keys in a map.
 *
 * @see RefreshConfig
 */
public enum RefreshConfigProp {
    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     */
    REFRESH_WINDOW_FACTOR(Double.class, 0.5, 1.0, 0.8),
    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     */
    REFRESH_WINDOW_JITTER(Double.class, 0.0, 0.25, 0.05),
    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     */
    REFRESH_MIN_PERIOD_MILLIS(Long.class, 0L, 1000L * 60 * 15, 1000L * 60 * 1),
    /**
     * If the {@code LoginModule} and {@code SaslClient} implementations support
     * multiple simultaneous login contexts on a single {@code Subject} at the same
     * time. If true, then upon refresh, logout will only be invoked on the original
     * {@code LoginContext} after a new one successfully logs in. This can be
     * helpful if the original credential still has some lifetime left when an
     * attempt to refresh the credential fails; the client will still be able to
     * create new connections as long as the original credential remains valid.
     * Otherwise, if logout is immediately invoked prior to relogin, a relogin
     * failure leaves the client without the ability to connect until relogin does
     * in fact succeed. The default value is false.
     */
    RELOGIN_ALLOWED_BEFORE_LOGOUT(Boolean.class, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE);
    private RefreshConfigProp(Class<? extends Comparable<?>> valueType, Comparable<?> minValue, Comparable<?> maxValue,
            Comparable<?> defaultValue) {
        // etc...
    }
 
    /**
     * Return the always non-null value type
     *
     * @return the always non-null value type
     */
    public Class<? extends Comparable<?>> valueType() {
        return valueType;
    }
    /**
     * Return the minimum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the minimum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> minValue() {
        return minValue;
    }

    /**
     * Return the maximum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the maximum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> maxValue() {
        return maxValue;
    }

    /**
     * Return the default value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the default value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> defaultValue() {
        return defaultValue;
    }
 
    /**
     * The ordered list of string keys (containing the enum name and the camelCase
     * version of the name, in that order) so the instance can be picked out of a
     * map keyed with Strings
     * 
     * @return the always non-null/non-empty list of string keys (containing the
     *         enum name and the camelCase version of the name, in that order) that
     *         denote this instance in a {@code Map}
     */
    public List<String> stringKeys() {
        return stringKeys;
    }
 
    /**
     * Return the (potentially null) value as the value type
     *
     * @param value
     *            a (potentially null) value of either the value type or a type such
     *            that the result of invoking {@code valueOf()} on the value type
     *            class against the value results in the value converted to the
     *            value type
     * @return the (potentially null) value as the value type
     * @throws OAuthBearerConfigException
     *             if a non-null value cannot be converted to the value type while
     *             remaining consistent with any min/max constraints
     */
    public Comparable<?> parseValue(Object value) throws OAuthBearerConfigException {
        // etc...
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.RefreshConfig
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Immutable refresh-related configuration for instances of
 * {@link ExpiringCredentialRefreshingLogin}. Configuration that is independent
 * of the actual credential itself and that can be defined as login module
 * options in a JAAS config should be stored here.
 */
public class RefreshConfig {
    /**
     * Default constructor with individual refresh configuration properties
     * being set to their default values
     */
    public RefreshConfig() {
        this(Collections.<String, String>emptyMap(), "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     * 
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap) {
        this(configMap, "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     *
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @param keyPrefix
     *            the mandatory (but potentially blank) prefix to prepend to String
     *            keys
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap, String keyPrefix) {
        // etc...
    }
 
    public Map<RefreshConfigProp, Object> refreshConfigMap() {
        return refreshConfigMap;
    }

    public double refreshWindowFactor() {
        return refreshWindowFactor;
    }

    public double refreshWindowJitter() {
        return refreshWindowJitter;
    }

    public long refreshMinPeriodMillis() {
        return refreshMinPeriodMillis;
    }

    public boolean reloginAllowedBeforeLogout() {
        return reloginAllowedBeforeLogout;
    }

    // etc...
}
org.apache.kafka.common.security.oauthbearer.ExpiringCredentialLoginModule
package org.apache.kafka.common.security.oauthbearer;

import javax.security.auth.spi.LoginModule;

/**
 * An extension of the {@code LoginModule} interface that must be implemented by
 * any login module that generates an instance of {@link ExpiringCredential} to
 * be managed/refreshed by {@link ExpiringCredentialRefreshingLogin}. The
 * existence of this interface is necessary to deal with the case when there are
 * multiple enabled SASL mechanisms, one or more of which generate an expiring
 * credential, and a SASL mechanism (as opposed to authentication via SSL) is
 * used for inter-broker communication.
 * <p>
 * The following broker-side JAAS configuration helps illustrate the need for
 * this interface:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      // etc...;
 *      some.othersaslmechanism.OtherSaslMechanismLoginModule Optional
 *      // etc...;
 * };
 * </pre>
 *
 * The {@code LoginContext} instance will initialize both login modules and ask
 * them both to login -- that's how JAAS works -- regardless of which SASL
 * mechanism is used for inter-broker communication. It is imperative that the
 * login succeeds for the login module associated with the mechanism configured
 * for inter-broker communication; it doesn't matter if any other mechanisms
 * fail because they aren't actually being used for client-side work (they are
 * only being used for the server side of the SASL handshake, and that is
 * performed by the {@code SaslServer} instance rather than the
 * {@code LoginModule} instance). This has 2 implications:
 * <p>
 * <ol>
 * <li>The {@code CallbackHandler} instance provided to the {@code LoginContext}
 * instance (which then passes it to all of the login modules) must be the
 * correct one for the login module that must succeed; it does not have to be
 * the correct one for any others.
 * <li>The login modules that don't have to succeed (because they aren't being
 * used for inter-broker communication) should be marked {@code Optional} in
 * case they fail.
 * </ol>
 * <p>
 * This raises the critical issue of how any instance of {@link Login} can know
 * which {@code CallbackHandler} instance to instantiate. The
 * {@link ScramLoginModule} and {@link PlainLoginModule} don't use the callback
 * handler, but {@code com.sun.security.auth.module.Krb5LoginModule} does, and
 * in fact {@link LoginCallbackHandler} serves the purpose of short-circuiting
 * any request for user interaction in that case. So this issue hasn't been
 * critical in the past; it is only now becoming more important.
 * <p>
 * All the {@code Login} instance knows is the SASL mechanism enabled for
 * inter-broker communication (from the broker config) and the JAAS
 * configuration (for example, the one shown above). It cannot know which
 * {@code CallbackHandler} instance to instantiate from just that information
 * because it cannot determine from that information alone which of the login
 * modules handles the declared inter-broker communication mechanism.
 * <p>
 * We thus arrive at the need for this interface. A {@code Login} instance can
 * look at all of the declared login module classes, determine which of them
 * implements this interface, and then for each of those it can instantiate an
 * instance using the default constructor and ask for its applicable mechanisms
 * via {@link #mechanisms()}. If it finds a login module with an applicable SASL
 * mechanism matching the one being used for inter-broker communication it can
 * then ask for a {@code CallbackHandler} instance via
 * {@link #newCallbackHandler()}. Otherwise, if it doesn't find a match, it just
 * instantiates an instance of {@link LoginCallbackHandler}, which is what
 * AbstractLogin#login()} currently creates and is required for the initial
 * Kerberos login attempt for short-circuit behavior as mentioned above.
 */
public interface ExpiringCredentialLoginModule extends LoginModule {
    /**
     * Return the set of SASL mechanisms this login module applies to
     *
     * @return the set of SASL mechanisms this login module applies to
     */
    Set<String> mechanisms();

    /**
     * Return a new {@code CallbackHandler} instance appropriate for this login
     * module when one of its supported mechanisms as returned by
     * {@link #mechanisms()} is the SASL mechanism for inter-broker communication.
     *
     * @return a new {@code CallbackHandler} instance appropriate for this login
     *         module when one of its supported mechanisms as returned by
     *         {@link #mechanisms()} is the SASL mechanism for inter-broker
     *         communication.
     */
    CallbackHandler newCallbackHandler();
} 
org.apache.kafka.common.security.oauthbearer.ExpiringCredentialRefreshingLogin
package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.security.auth.Login;

/**
 * This class is responsible for refreshing logins for both Kafka client and
 * server when the login is a type that has a limited lifetime and will expire. The
 * credentials for the login must implement {@link ExpiringCredential}, and the
 * {@code LoginModule} must implement {@link ExpiringCredentialLoginModule}.
 */
public class ExpiringCredentialRefreshingLogin implements Login {
    // etc...
 
    public RefreshConfig refreshConfig() {
        return refreshConfig;
    }
 
    // etc...
} 

 

OAuth Bearer Tokens

Every OAuth Bearer Token is a string representing an authorization issued to the client.  There are at least two types of OAuth Bearer Tokens: an opaque token and a JSON Web Token (JWT).  Nothing can be said about the contents of an opaque token other than that the contents are only available via some third party (e.g. an authorization server).  Every JWT contains both a JSON Object Signing and Encryption (JOSE) Header and a JWT Claims Set.  There are two distinct types of JWTs: a JSON Web Signature (JWS) and a JSON Web Encryption (JWE).  There is no intent to model a JWE with the initial implementation.

 

OAuth Bearer Tokens

org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
package org.apache.kafka.common.security.oauthbearer;
 /**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken extends ExpiringCredential {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     *
     * @return <code>b64token</code> value as defined in
     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *         Section 2.1</a>
     */
    String value();

    /**
     * The token's scope of access, as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
     * 1.4</a>
     *
     * @return the token's (always non-null but potentially empty) scope of access,
     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     *         6749 Section 1.4</a>. Note that all values in the returned set will
     *         be trimmed of preceding and trailing whitespace, and the result will
     *         never contain the empty string.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    Set<String> scope() throws IOException, OAuthBearerException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJwt
package org.apache.kafka.common.security.oauthbearer;

/**
 * A JSON Web Token, which may either be a JWS or a JWE. Neither signature
 * validation nor decryption are performed here.
 *
 * @see <a href="https://tools.ietf.org/html/rfc7519">RFC 7519</a>
 */
public abstract class OAuthBearerJwt implements OAuthBearerToken {
    /**
     * Constructor with no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, null);
    }

    /**
     * Constructor with the given scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization, String scopeClaimName) throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Return the 3 or 5 dot-separated sections of the JWT compact serialization
     * 
     * @return the 3 or 5 dot-separated sections of the JWT compact serialization
     */
    public List<String> splits() {
        return splits;
    }

    /**
     * Return the JOSE Header as a {@code Map}
     *
     * @return the JOSE header
     */
    public Map<String, Object> header() {
        return header;
    }

    /**
     * Return the JWT Claim Set as a {@code Map}
     *
     * @return the (always non-null but possibly empty) claims
     */
    public abstract Map<String, Object> claims();

    /**
     * Return the scope claim name, if any, otherwise null
     *
     * @return the scope claim name, if any, otherwise null
     */
    public String scopeClaimName() {
        return scopeClaimName;
    }

    /**
     * Indicate if the claim exists and is the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which should either be String.class,
     *            Number.class, or List.class
     * @return true if the claim exists and is the given type, otherwise false
     */
    public boolean isClaimType(String claimName, Class<?> type) {
        // etc...
    }

    /**
     * Extract a claim of the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which must either be String.class,
     *            Number.class, or List.class
     * @return the claim if it exists, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim exists but is not the given type
     */
    public <T> T getClaim(String claimName, Class<T> type) throws OAuthBearerJwtClaimException {
        // etc...
    }

    /**
     * Extract a claim that could be either a String or a String List as a String
     * List (if it was a String it will be returned as a list of size 1).
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the claim in the form of a String List, if it exists, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim exists but is neither a String nor a List
     */
    public List<String> getListClaimFromStringOrList(String claimName) throws OAuthBearerJwtClaimException {
        // etc...
    }
 
    /**
     * Extract a claim in its raw form
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the raw claim value, if it exists, otherwise null
     */
    public Object getRawClaim(String claimName) {
        return claims().get(Objects.requireNonNull(claimName));
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     * claim as a String List
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     *         claim as a String List if available, otherwise null. An Audience
     *         claim that is a String will be returned as a List of size 1.
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public List<String> audience() throws OAuthBearerJwtClaimException {
        return getListClaimFromStringOrList(JwtClaim.AUDIENCE.claimName());
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     * Time</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     *         Time</a> claim if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public Number expirationTime() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.EXPIRATION_TIME.claimName(), Number.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     * At</a> claim
     *
     * @return the
     *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     *         At</a> claim if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public Number issuedAt() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.ISSUED_AT.claimName(), Number.class);
    }
 
    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public String issuer() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.ISSUER.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     * ID</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     *         ID</a> claim if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public String jwtId() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.JWT_ID.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     * Before</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     *         Before</a> claim if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public Number notBefore() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.NOT_BEFORE.claimName(), Number.class);
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerJwtClaimException
     *             if the claim value is the incorrect type
     */
    public String subject() throws OAuthBearerJwtClaimException {
        return getClaim(JwtClaim.SUBJECT.claimName(), String.class);
    }
 
    /**
     * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
     * object, and return the map of member names to their values (each value being
     * represented as either a String, a Number, or a List of Strings).
     *
     * @param split
     *            the value to decode and parse
     * @return the map of JSON member names to their String, Number, or String List
     *         value
     * @throws OAuthBearerIllegalTokenException
     *             if the given Base64URL-encoded value cannot be decoded or parsed
     */
    public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJws
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>. Note that
 *      digital signature validation is not performed.
 */
public class OAuthBearerJws extends OAuthBearerJwt {
    /**
     * Constructor with no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if the compact serialization is not a valid JWS (meaning it did
     *             not have 3 dot-separated Base64URL sections; or the header or
     *             claims either are not valid Base 64 URL encoded values or are not
     *             JSON after decoding; or the existence or absence of a digital
     *             signature is inconsistent with the mandatory 'alg' header value)
     */
    public OAuthBearerJws(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, null);
    }

    /**
     * Constructor with the given scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWS
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJws(String compactSerialization, String scopeClaimName) throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.JwtHeaderParameter
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Signature and Encryption Header Parameters
 *
 * @see <a href="https://www.iana.org/assignments/jose/jose.xhtml">JSON Web
 *      Signature and Encryption Header Parameters</a>
 */
public enum JwtHeaderParameter {
    ALGORITHM("alg"),
    JWK_SET_URL("jku"),
    JSON_WEB_KEY("jwk"),
    KEY_ID("kid"),
    X_509_URL("x5u"),
    X_509_CERTIFICATE_CHAIN("x5c"),
    X_509_CERTIFICATE_SHA1_THUMBPRINT("x5t"),
    X_509_CERTIFICATE_SHA256_THUMBPRINT("x5t#S256"),
    TYPE("typ"),
    CONTENT_TYPE("cty"),
    CRITICAL("crit"),
    ENCRYPTION_ALGORITHM("enc"),
    COMPRESSION_ALGORITHM("zip"),
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud");

    private String headerParameterName;

    private JwtHeaderParameter(String headerParameterName) {
        this.headerParameterName = headerParameterName;
    }

    String headerParameterName() {
        return headerParameterName;
    }
}
org.apache.kafka.common.security.oauthbearer.JwtClaim
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Token Claims
 *
 * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
 *      Claims</a>
 */
public enum JwtClaim {
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud"),
    EXPIRATION_TIME("exp"),
    NOT_BEFORE("nbf"),
    ISSUED_AT("iat"),
    JWT_ID("jti");

    private String claimName;

    private JwtClaim(String claimName) {
        this.claimName = claimName;
    }

    public String claimName() {
        return claimName;
    }
}

Substitutable Module Options

Substitutable Module Options

The mechanics of token retrieval and token validation (both described later) will differ across OAuth deployments.  For example, for token retrieval, each deployment will inject credentials to the token endpoint differently, and the parameters sent to the token endpoint may also differ.  Token validation will also differ because OAuth supports multiple methods of validation.  Configuration of the retrieval and validation mechanisms, which are done via JAAS configuration, must therefore be flexible.  In particular, while we collaboratively use instances implementing javax.security.auth.Callback and javax.security.auth.CallbackHandler to retrieve information, we can't know in advance what information will be required in order to retrieve or validate a token; a username and password might – or might not – be required, for example, and retrieval and validation will likely require much more as well.  We also don't know where this information will come from: a file?  an environment variable?  a database?  Somewhere else?  We need implementations of the Callback and CallbackHandler interfaces that are just as flexible as we need the JAAS configuration to be.

JAAS config options (each of which is an individual element of the space-separated ModuleOptions list), in combination with appropriate Callback and CallbackHandler implementations, will support arbitrarily complex retrieval and substitution inside the option value.  The JAAS Configuration spec already supports system property substitution via the ${system.property} syntax; we will implement support for arbitrarily complex substitutions.  For example, the following would support substitution of the contents of a file (which is a common way to store secrets, especially within containers) into the option value:

thePassword="$[file/doNotLog/notBlank/=/path/to/secrets/the_secret]"

There are several features here that deserve comment:

  • The "$[" and "]" delimiters are the signal to perform a substitution (we can't use "${" and "}" because that is already defined by the JAAS Configuration spec to mean system property substitution).  Note that to perform substitution within a substitution we will also support "$[[" and "]]", "$[[[" and "]]]", etc. as delimiters (e.g. "prefix_$[[outer_$[inner]]]_suffix", where the nesting capability seems most likely to be used to specify default values as introduced below). 
  • Immediately inside the opening delimiter is the type of substitution followed by any constraints we wish to apply.  In the above, we identify this as a file substitution and we indicate two constraints: the resulting value should never be logged (i.e. store it as an instance of org.apache.kafka.common.config.types.Password instead of as a String); and the contents must not be blank (meaning it must not be empty or only contain whitespace).  It is an error if any of the specified constraints are violated.
  • Immediately after the type of substitution and any optional constraints is an equal sign ("=") followed by the value (which in the case of the "file" type is interpreted as the filename to read); then ultimately the closing delimiter appears.

This scheme is flexible and powerful; it handles most cases, but it remains relatively easy to create and read.  Importantly, the types of replacements can be expanded in the future without breaking compatibility.

How would we retrieve the above value?  We define the following representation of the module options and their substitution state along with associated Callback and CallbackHandler implementations:

org.apache.kafka.common.security.oauthbearer.SubstitutableModuleOptions
package org.apache.kafka.common.security.oauthbearer;

/**
 * Holds state regarding which {@code LoginModule} <a href=
 * "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
 * options</a> have been substituted. Instances of this class are thread-safe.
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptionsCallback
 */
public class SubstitutableModuleOptions {
    /**
     * Constructor
     *
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     */
    public SubstitutableModuleOptions(Map<String, String> moduleOptionsMap) {
        this.moduleOptionsMap = Collections.unmodifiableMap(Objects.requireNonNull(moduleOptionsMap));
    }

    /**
     * Return (an unmodifiable copy of) the original module options map provided
     * during construction
     *
     * @return (an unmodifiable copy of) the original module options map provided
     *         during construction
     */
    Map<String, String> moduleOptionsMap() {
        return moduleOptionsMap;
    }
 
    /**
     * Return an unmodifiable map identifying which module options have been
     * processed for substitution and the resulting value after substitution (if
     * any) was applied. A module option is guaranteed to have been processed for
     * substitution and its name will appear as a key in the returned map only after
     * {@link #setSubstitutionValue(String, String)} or
     * {@link #setSubstitutionValue(String, Password)} is called.
     *
     * @return an unmodifiable map identifying which module options have been
     *         processed for substitution and the resulting value after substitution
     *         (if any) was applied
     */
    Map<String, Object> moduleOptionSubstitutionState() {
        return Collections.unmodifiableMap(moduleOptionSubstitutionState);
    }
 
    /**
     * Identify that the option with the given name has had substitution performed
     * for it resulting in the given non-password value. This method can be
     * successfully invoked (and is idempotent) only if invoked before
     * {@link #setSubstitutionValue(String, Password)}; the option value cannot be
     * changed.
     *
     * @param optionName
     *            the mandatory option name, which must exist in the map returned by
     *            {@link #moduleOptionsMap()}
     * @param substitutionValue
     *            the mandatory substitution value to set
     */
    public void setSubstitutionValue(String optionName, String substitutionValue) {
        setSubstitutionValueInternal(Objects.requireNonNull(optionName), Objects.requireNonNull(substitutionValue));
    }
 
    /**
     * Identify that the option with the given name has had substitution performed
     * for it resulting in the given password-related value. This method can be
     * successfully invoked (and is idempotent) only if invoked before
     * {@link #setSubstitutionValue(String, String)}; the option value cannot be
     * changed.
     *
     * @param optionName
     *            the mandatory option name, which must exist in the map returned by
     *            {@link #moduleOptionsMap()}
     * @param substitutionValue
     *            the mandatory substitution value to set
     */
    public void setSubstitutionValue(String optionName, Password substitutionValue) {
        setSubstitutionValueInternal(Objects.requireNonNull(optionName), Objects.requireNonNull(substitutionValue));
    }
 
    // etc..
}
org.apache.kafka.common.security.oauthbearer.SubstitutableModuleOptionsCallback
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} related to introspection requests against a JAAS
 * configuration
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptions
 */
public class SubstitutableModuleOptionsCallback implements Callback {
    /**
     * Constructor
     *
     * @param substitutableModuleOptions
     *            the mandatory substitutable module options
     * @param optionName
     *            the mandatory option name, which must exist as a module option
     *            name in the given substitutable module options
     */
    public SubstitutableModuleOptionsCallback(SubstitutableModuleOptions substitutableModuleOptions,
            String optionName) {
        // etc...
    }
 
    /**
     * Return the substitutable module options provided at construction time
     *
     * @return the substitutable module options provided at construction time
     */
    public SubstitutableModuleOptions substitutableModuleOptions() {
        return substitutableModuleOptions;
    }

    /**
     * Return the option name provided at construction time
     *
     * @return the option name provided at construction time
     */
    public String optionName() {
        return optionName;
    }
 
    /**
     * Identify that the option identified by {@link #optionName()}, on the instance
     * returned by {@link #substitutableModuleOptions()}, has had substitution
     * performed for it resulting in the given non-password value. This method can
     * be successfully invoked (and is idempotent) only when the option value isn't
     * being changed.
     *
     * @param substitutionValue
     *            the mandatory non-password substitution value to set
     */
    public void setSubstitutionValue(String substitutionValue) {
        substitutableModuleOptions.setSubstitutionValue(optionName, Objects.requireNonNull(substitutionValue));
    }
 
    /**
     * Identify that the option identified by {@link #optionName()}, on the instance
     * returned by {@link #substitutableModuleOptions()}, has had substitution
     * performed for it resulting in the given password-related value. This method
     * can be successfully invoked (and is idempotent) only when the option value
     * isn't being changed.
     *
     * @param substitutionValue
     *            the mandatory password-related substitution value to set
     */
    public void setSubstitutionValue(Password substitutionValue) {
        substitutableModuleOptions.setSubstitutionValue(optionName, Objects.requireNonNull(substitutionValue));
    }
 
    /**
     * Return the substitution value, if any has been set, otherwise null. Any
     * non-null value will be either a {@code String} or a {@code Password}. Note
     * that the value may not have been set via a call to
     * {@link #setSubstitutionValue(Password)} or
     * {@link #setSubstitutionValue(String)}; it is possible that the value was
     * already set prior to construction of this instance, in which case that value
     * will be returned here.
     *
     * @return the substitution value, if any has been set, otherwise null
     */
    public Object substitutionValue() {
        return substitutableModuleOptions.moduleOptionSubstitutionState().get(optionName);
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.SubstitutableModuleOptionsCallbackHandler
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code CallbackHandler} that handles introspection requests against a JAAS
 * configuration
 *
 * @see SubstitutableModuleOptionsCallback
 * @see SubstitutableModuleOptions
 */
public class SubstitutableModuleOptionsCallbackHandler implements CallbackHandler {
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        for (Callback callback : callbacks) {
            if (callback instanceof SubstitutableModuleOptionsCallback) {
                SubstitutableModuleOptionsCallback substitutableModuleOptionsCallback = (SubstitutableModuleOptionsCallback) callback;
                // only perform substitution if it has not yet been done
                if (substitutableModuleOptionsCallback.substitutionValue() != null)
                    continue;
                String optionName = substitutableModuleOptionsCallback.optionName();
                SubstitutableModuleOptions substitutableModuleOptions = substitutableModuleOptionsCallback
                        .substitutableModuleOptions();
                String rawValue = substitutableModuleOptions.moduleOptionsMap().get(optionName);
                if (rawValue != null) {
                    Object substitutionValue = getSubstitutionValue(optionName, rawValue);
                    if (substitutionValue instanceof String)
                        substitutableModuleOptionsCallback.setSubstitutionValue((String) substitutionValue);
                    else
                        substitutableModuleOptionsCallback.setSubstitutionValue((Password) substitutionValue);
                }
            } else
                throw new UnsupportedCallbackException(callback,
                        String.format("Unrecognized Callback type: %s", callback.getClass().getName()));
        }
    }
 
    /*
     * Handle substitution, dealing with circular references, constraints, etc.
     */
    private Object getSubstitutionValue(String optionName, String rawValue) throws IOException {
        // etc...
    }
 
    // etc...
}

Given the above code, and assumng the existence of the following mapping in the module options map:

    thePassword="$[file/doNotLog/notBlank/=/path/to/secrets/the_secret]"

we can retrieve the contents of the file as follows:

Retrieving Values
SubstitutableModuleOptions substitutableModuleOptions = new SubstitutableModuleOptions(moduleOptionsMap);
SubstitutableModuleOptionsCallback callback = new SubstitutableModuleOptionsCallback(substitutableModuleOptions,
        "thePassword");
substitutableModuleOptionsCallbackHandler.handle(new Callback[] {callback});
Object substitutionValue = callback.substitutionValue();
String thePassword = substitutionValue instanceof String ? (String) substitutionValue
        : ((Password) substitutionValue).value();

The initial set of supported substitution types and their specifiable constraints are as follows:

TypeDescriptionSpecifiable ConstraintsNotes
fileFile content substitutionnotBlank, notEmpty, optionalExist, doNotLog, defaultValueIt is an error if the file does not exist or is not readable unless optionalExist is specified. If a defaultValue is specified then optionalExist is implied, and the default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the filename or the default value depend on substitutions that were marked doNotLog then doNotLog is implied.
envvarEnvironment variable substitutionnotBlank, notEmpty, optionalExist, doNotLog, defaultValueIt is an error if the environment variable does not exist unless optionalExist is specified. If a defaultValue is specified then optionalExist is implied, and the default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the environment variable name or the default value depend on substitutions that were marked doNotLog then doNotLog is implied.
optionAnother module option value substitutionnotBlank, notEmpty, defaultValueIt is always an error if the specified option does not exist. The default value will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the option value (whether determined via a default value or not) depends on substitutions that were marked doNotLog then doNotLog is implied.



Token Retrieval

Token Retrieval

Token retrieval occurs on the client side of the SASL negotiation (the producer/consumer, or the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the javax.security.auth.spi.LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval.

 

Token Validation

 

Proposed Changes

First, in the spirit of KIP-86, all code will be delivered at or under the org.apache.kafka.common.security.oauthbearer package.  

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None

  • If we are changing behavior how will we phase out the older behavior?

There is no change to existing behavior

Rejected Alternatives

Exceptions

<<<NOTE: Please consider this to be open for debate at this point>>>

Unchecked exceptions can be added or deleted without breaking binary compatibility, so it would seem that OAuthBearerException should be unchecked and we should forego the compiler help provided by the use of checked exceptions.  However, the set of exception types and the size of the overall code base are both small, and the risk that we would want to change the type(s) of exception(s) thrown is minimal.

Token Refresh

<<<NOTE: Please consider this to be open for debate at this point>>>

We implement org.apache.kafka.common.security.oauthbearer.ExpiringCredential as an interface rather than as a class because an interface creates less of a constraint – implementers of other types of expiring credentials remain free to extend any class they want.  The risk of having to add a new method while Java 7 is still a supported target (which would break forward compatibility) is low.

We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol), so it wouldn't always know how to find the JAAS config options.  This caused a problem, though: we can't invoke login on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate.  It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases; hence we created the ExpiringCredentialLoginModule interface.

 

  • No labels