Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Cover topic and message deletion

...

As existing tools are migrated to using AdminClient APIs rather than interacting directly with ZooKeeper we need to apply policies to them, but the existing policy interfaces make it difficult to do this in a consistent way.

...

Problem 1 - Topic config is governed by both CreateTopicPolicy and AlterConfigPolicy

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places. If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

...

Problem 2 - Creating more partitions is not currently covered by a policy

Changing the number of partitions in a topic was the subject of KIP-195 and is just one kind of topic modification. Consider two example use cases:

...

So there needs to be a policy for specifically for modifying a topic. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be easy for the code implementing a user's policies to get out of sync if it needs to be maintained in two places. It would also be easy to configure one policy but not the other. So it would be better if there were a single policy interface which is applied to both topic creation and modification.

...

Problem 3 - CreateTopicPolicy can govern partition assignment, but there is no policy for reassignment

Reassigning replicas is another kind of topic modification and the subject of KIP-179. By similar reasoning to example 2 it, too, should be covered by the same policy.

How does this KIP relate to KIP-170?

Public Interfaces

A new policy interface will be added which properly can be applied uniformly to topic creation and modifications.

This policy will be configured via a new configuration key, topic.actions.policy.class.name.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicActionsPolicy

The following policy interface will be added

Problem 4 - There is no policy for topic deletion or message deletion

KIP-170 proposes a policy for topic deletion (see that KIP for the motivation behind this) and KIP-204 proposes to add an AdminClient API for the existing network protocol for deleting messages from the partitions of a topic.

It's pointless to add a policy for topic deletion if there is no policy for message deletion (deleting all the messages from the topic is practically equivalent to deleting the topic itself in most cases). If there were a separate TopicDeletePolicy and MessageDeletePolicy we have the similar problem as described above for separate topic creation and modification policies: It's unnecessarily difficult and tedious to keep the policies consistent.

Public Interfaces

Two new policy interfaces will be added:

  • TopicActionsPolicy will apply to topic creation and alteration and be configured by the new topic.actions.policy.class.name config
  • TopicDeletionPolicy will apply to message and topic deletion and be configured by the new topic.deletion.policy.class.name config.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicActionsPolicy, TopicDeletionPolicy and supporting interfaces

The following policy interfaces and supporting classes will be added

Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic either before, or as a result of,
 * an administrative request affecting a topic.
 */
interface TopicState {
    /**
     * The number of partitions of the topic.
     */
    int numPartitions();

    /**
     * The replication factor of the topic.
     */
    Short replicationFactor();

    /**
     * The replica assignments of the topic.
     */
    Map<Integer, List<Integer>> replicasAssignments();

    /**
     * The topic config.
     */
    Map<String,String> configs();

    /**
     * Returns whether the topic is marked for deletion.
     */
    boolean markedForDeletion();

}


/** The current state of the topics in the cluster, before the request takes effect. */
interface ClusterState {
    /**
     * Returns the current state of the given topic, or null if the topic does not exist.
     */
    TopicState topicState(String topicName);

    /**
     * Returns all the topics in the cluster, including internal topics if
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is true.
     */
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);
}

/**
 * A policy that is enforced on topic creation and alteration.
 * An implementation of this policy can be configured on a broker via the
 * {@code topic.actions.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 * @see TopicDeletionPolicy for the policy for deleting messages and topics.
 */
interface TopicActionsPolicy extends Configurable, AutoCloseable {

    /**
     * Parameters for a request to {@linkplain #isCreate() create} or
     * {@linkplain #isAlter() alter} the given {@linkplain #topic}.
     *
     * @see #validate(RequestMetadata, ClusterState)
     */
    static interface RequestMetadata {

        /**
         * Returns true if the request is for the creation of the given {@link #topic()}.
         */
        public boolean isCreate();

        /**
         * Returns true if the request is for the alteration of the given {@link #topic()}.
         */
        public boolean isAlter();

        /**
         * The topic the action is being performed upon.
         */
        public String topic();

        /**
         * The authenticated principal making the request, or null if the session is not authenticated.
         */
        public KafkaPrincipal principal();

        /**
         * The state the topic will have after the request.
         * <ul>
         * <li>When {@link #isCreate()} is true, this will be the requested state of the topic to be created.</li>
         * <li>When {@link #isAlter()} is true, this will be the state the topic will have after the alteration.</li>
Code Block
languagejava
linenumberstrue
/**
 * A policy that is enforced on actions affecting topics.
 * An implementation of this policy can be configured on a broker via the
 * {@code topic.actions.policy.class.name} broker config. 
 * When this is configured the named class will be instantiated reflectively 
 * using its nullary constructor and will then pass the broker configs to 
 * its <code>configure()</code> method. During broker shutdown, the 
 * <code>close()</code> method will be invoked so that resources can be 
 * released (if necessary).
 */
interface TopicActionsPolicy extends Configurable, AutoCloseable {
    /** Enumerates possible actions on topics. */
    static enum Action {
        /** The creation of a topic. */
        CREATE,

 * </ul>
      /** The modification of a topic. */
        MODIFY,public TopicState requestedState();

    }

    /**
  The deletion of a* topic. */
        DELETE
    }

    /**
     * Represents the state of a topic either before, or as a result of, an administrative request affecting the topic.
     */
    static interface TopicState {Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request parameters for the provided topic do not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that /**
validation failure only affects the relevant topic,
   * The number* ofother partitionstopics ofin the topic.
request will still be processed.
     */
     * @param requestMetadata publicthe int numPartitions();

   request parameters for the provided topic.
     /**
 @param clusterState the current state of the cluster
 * The replication factor of* the@throws topic.
PolicyViolationException if the request parameters do not satisfy this */policy.
     */
   public Shortvoid replicationFactor();

        validate(RequestMetadata requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}

/**
 * A policy that is enforced on message *or The replica assignments of the topictopic deletion.
 * An implementation of this policy can be */
configured on a      public Map<Integer, List<Integer>> replicasAssignments()

        /**
         * The topic config.
         */
        public Map<String,String> configs();

        /**
         * Returns whether the topic is marked for deletion
         */broker via the
 * {@code topic.deletion.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its {@link #configure(Map)}} method. During broker shutdown, the
 * {@link #close()} method will be invoked so that resources can be
 * released (if necessary).
 *
 * @see TopicActionsPolicy for the policy for creating and altering topics.
 */
interface TopicDeletionPolicy extends Configurable, AutoCloseable {
    /**
     * Parameters for publica boolean markedForDeletion();

    }

    /**request to delete {@linkplain #isMessageDeletion()} messages} from the topic
     * Parameters for a request to perform an {@linkplain #action} on a {@linkplain #topic}or {@linkplain #isTopicDeletion() delete the entire topic}
     *
     * @see #validate(RequestMetadata, ClusterState)
     */
    static interface RequestMetadata {

        /**
         * The {@linkplaintopic Actionthat action}is beingthe performedsubject onof the topicdeletion.
         */
        public ActionString actiontopic();

        /**
         * The topic authenticated principal making the request, or null if the {@linkplain #action() action}session is beingnot performed uponauthenticated.
         */
        public StringKafkaPrincipal topicprincipal();

         /**
         * The authenticated principal making the request, or null if the session is not authenticated. Returns true if the topic itself is being deleted, or false if
         * the topic is not being deleted but zero or more records from
         */
 one or more of the topic's partitions publicare KafkaPrincipal principal();

being deleted.
         /**
 This is mutually exclusive with    * The state the topic will have after the request.{@link #isMessageDeletion()}.
         */
         * <ul>boolean isTopicDeletion();

        /**
 * <li>For {@link Action#CREATE} this will be the requested* stateReturns oftrue thezero topicor tomore be created.</li>records from
         * <li>For {@link Action#MODIFY} this will beone or more of the statetopic's thepartitions topicare willbeing havedeleted, afterbut the modification.</li> topic itself is
         * <li>For {@link Action#DELETE} this will be null.</li>
 not being deleted.
         * This is mutually exclusive with * </ul>{@link #isTopicDeletion()}.
         */
        publicboolean TopicState postRequestStateisMessageDeletion();

    }

    /**
 The current state of the topics in the cluster,* beforeReturns thea requestmap takesof effect. */
    interface ClusterState {
   topic partitions and the corresponding offset of the last message
      /**
     * to be retained. Messages *before Returnsthis theoffset currentwill statebe ofdeleted.
 the given topic, or null if the topic does* notPartitions exist.
          */
which won't have messages deleted won't be present in the map.
         * publicWhen TopicState{@link topicState#isTopicDeletion(String topicName);

        /**)} is true then all of the topic's partitions will be
         * Returnspresent in athe Mapmap withand all topicsthe andoffsets theirwill correspondingbe number of partitions{@link Long#MAX_VALUE}.
         */
        public Map<StringMap<Integer, Integer>Long> topicsPartitionCountdeletedMessageOffsets();

    }

     /**
     * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request parameters for the provided topic do not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
 Note that validation
  * Note that *validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
      * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validate(RequestMetadata requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}

This policy The TopicActionsPolicy will be applied:

  • On topic creation
  • On topic modification
    • Change in topic config, via AdminClient.alterConfigs() (this change done as part of this KIP).
    • Adding partitions to topics, via AdminClient.createPartitions() (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor via AdminClient.reassignPartitions() (see KIP-179)

On topic deletion

...

This will be configurable via the topic.actions.policy.class.name broker config.applied on topic and message deletion.

Note: Unlike previous policy interfaces the inner RequestMetadata is an interface rather than a class. This should simplify testing and better permit use sites to, for example, lazily fetch metadata when it's actually required by the policy implementation, rather than eagerly fetch information which the policy didn't actually require.What other policies might there be in future? delete records policy

Deprecate existing policies

...