Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Kafka already has user configurable policies which can be used by a cluster administrator to limit how the cluster can be modified by non-administrator users (for example by using the AdminClient API):

As existing tools are migrated to using AdminClient APIs rather than interacting directly with ZooKeeper we need to apply policies to them, but the existing policy interfaces make it difficult to do this in a consistent way.

Example 1

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places. If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

Example 2

Changing the number of partitions in a topic was the subject of KIP-195 and is just one kind of topic modification. Consider two example use cases:

  1. It shouldn't be possible to create a topic, but then modify it so that it no longer conforms to the CreateTopicPolicy.
  2. An administrator who wants to prevent increasing the number of partitions entirely for topics with keys, because of the effect on partitioning.

To solve 1, we could simply apply the existing TopicCreationPolicy to modifications, but

So there needs to be a policy for specifically for modifying a topic. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be easy for the code implementing a user's policies to get out of sync if it needs to be maintained in two places. It would also be easy to configure one policy but not the other. So it would be better if there were a single policy interface which is applied to both topic creation and modification.

Example 3

Reassigning replicas is another kind of topic modification and the subject of KIP-179. By similar reasoning to example 2 it, too, should be covered by the same policy.

How does this KIP relate to KIP-170?

Public Interfaces

A new policy interface will be added which properly can be applied uniformly to topic creation and modifications.

This policy will be configured via a new configuration key,

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicActionsPolicy

The following policy interface will be added

 * A policy that is enforced on actions affecting topics.
 * An implementation of this policy can be configured on a broker via the
 * {@code} broker config. 
 * When this is configured the named class will be instantiated reflectively 
 * using its nullary constructor and will then pass the broker configs to 
 * its <code>configure()</code> method. During broker shutdown, the 
 * <code>close()</code> method will be invoked so that resources can be 
 * released (if necessary).
interface TopicActionsPolicy extends Configurable, AutoCloseable {
    /** Enumerates possible actions on topics. */
    static enum Action {
        /** The creation of a topic. */

        /** The modification of a topic. */

        /** The deletion of a topic. */

     * Represents the state of a topic either before, or as a result of, an administrative request affecting the topic.
    static interface TopicState {
         * The number of partitions of the topic.
        public int numPartitions();

         * The replication factor of the topic.
        public Short replicationFactor();

         * The replica assignments of the topic.
        public Map<Integer, List<Integer>> replicasAssignments()

         * The topic config.
        public Map<String,String> configs();

     * Parameters for a request to perform an {@linkplain #action} on a {@linkplain #topic}
     * @see #validate(RequestMetadata)
    static interface RequestMetadata {

         * The {@linkplain Action action} being performed on the topic.
        public Action action();

         * The topic the {@linkplain #action() action} is being performed upon.
        public String topic();

         * The authenticated principal making the request, or null if the session is not authenticated.
        public KafkaPrincipal principal();

          * The state the topic has before the request.
          * <ul>
          * <li>For {@link Action#CREATE} this will be null.</li>
          * <li>For {@link Action#MODIFY} this will be the state the topic currently has (before the modification).</li>
          * <li>For {@link Action#DELETE} this will be the state of the topic which is going to be deleted.</li>
          * </ul>
        public TopicState preRequestState();

         * The state the topic will have after the request.
         * <ul>
         * <li>For {@link Action#CREATE} this will be the requested state of the topic to be created.</li>
         * <li>For {@link Action#MODIFY} this will be the state the topic will have after the modification.</li>
         * <li>For {@link Action#DELETE} this will be null.</li>
         * </ul>
        public TopicState postRequestState();


     * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request parameters for the provided topic do not satisfy this policy.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;

This policy will be applied:

This will be configurable via the broker config.

Note: Unlike previous policy interfaces the inner RequestMetadata is an interface rather than a class. This should simplify testing and better permit use sites to, for example, lazily fetch metadata when it's actually required by the policy implementation, rather than eagerly fetch information which the policy didn't actually require.

What other policies might there be in future? delete records policy

Deprecate existing policies

The existing CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied when they are configured.

Using or will result in an deprecation warning in the broker logs.

It will be a configuration time error if both and are used at the same time, or both and are used at the same time.

Internally, an adapter implementation of TopicActionsPolicy will be used when CreateTopicPolicy and AlterConfigPolicy are configured, so policy use sites won't be unnecessarily complicated.

If, in the future, AdminClient.alterConfigs()/AlterConfigsRequest is changed to support changing broker configs a separate policy interface can be applied to such changes.

Compatibility, Deprecation, and Migration Plan

Existing users will have to reimplement their policies in terms of the new TopicActionsPolicy interface, and reconfigure their brokers accordingly. Since the TopicActionsPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 1.2.0 or a later release.

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

The proposed TopicActionsPolicy doesn't have to cover the topic deletion case: That could still be handled by a separate policy, but it is desirable to have a single policy to cover the whole lifecycle of a topic, and for the same information to be made available about a topic being deleted as about a topic being modified.

The proposed TopicActionsPolicy doesn't cover the use case of records being deleted from a topic. This is not the same as the modification of a topic, and would require a different policy interface. It might be appropriate to use the same topic state in such a policy interface, however.