...
A user can define a policy manager similar to the pluggable Authorizer by setting create.topics.policy.class.name
in server.properties
and implementing the the CreateTopicPolicy
interface. The interface will live in the clients jar under the org.apache.kafka.server.policy
package. It implements Configurable
so that implementations can act on broker configurations and AutoCloseable
so that resources can be released on shutdown.
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.server.policy; import org.apache.kafka.common.Configurable; public interface CreateTopicPolicy extends Configurable, AutoCloseable { void validate(TopicDetails topicDetails) throws InvalidRequestException; } public class TopicDetails { private final String topic; private final int numPartitions; private final short replicationFactor; private final Map<Integer, List<Integer>> replicasAssignments; private final Map<String, String> configs; /** replicasAssignment is a map from partition id to broker ids */ public TopicDetails(String topic, int numPartitions, short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { this.topic = topic; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.replicasAssignments = replicasAssignments; this.configs = configs; } } |
...
Code Block | ||
---|---|---|
| ||
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnly (new) create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32 validateOnly => BOOLEAN (new) CreateTopics Response (Version: 1) => [topic_errors] topic_errors => topic error_code error_message (new) topic => STRING error_code => INT16 error_message => NULLABLE_STRING (new) |
Proposed Changes
During broker start-up, AdminManager will instantiate create a CreateTopicPolicy
instance if create.topics.policy.class.name
is defined. It will then pass the broker configs to the configure
method.
When a create topics request is received, it will process each topic will be processed in sequence. For each topic:
...
Note that validation failure only affects the relevant topic, other topics in the request will still be processed. Also, it's worth mentioning that validateOnly
doesn't guarantee that topic creation will succeed. Errors could still occur during the actual creation process.
During broker shutdown, CreateTopicsPolicy.close
will be invoked.
As described in the previous section, we are proposing one policy config/interface per supported request type. The main advantage is that we can add additional configs in a compatible manner, but it also allows for modular policy implementations. Implementors also have the option of using a single implementation class if they wish, but multiple configs would still have to be set.
...