Status
Current state: Under discussion
Discussion thread:
JIRA:
-
KAFKA-4591Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
A number of Admin APIs have been/will be introduced as part of KIP-4. A subset of these Admin APIs allow the user to create, update or delete cluster resources: create topics, delete topics, alter topics, alter ACLs and alter configs. Create and delete topics were added to Kafka in 0.10.0.0 while the others are still in progress. These APIs are a major step towards self-serve Kafka clusters where application developers can, for example, create topics without having to go through the admins/operations team and without access to ZooKeeper.
There is, however, no way to validate the user request in order to restrict the operation parameters. The operations team may want to enforce that the replication factor, min.insync.replicas and/or retention settings for a topic are within an allowable range for example. In this KIP, we limit ourselves to validation of the create topic request, but the approach can be used for other request types in the future.
Public Interfaces
A user can define a policy manager similar to the pluggable Authorizer by setting create.topics.policy.class.name
in server.properties
and implementing the the CreateTopicPolicy
interface. The interface will live in the clients jar under the org.apache.kafka.server.policy
package.
package org.apache.kafka.server.policy; import org.apache.kafka.common.Configurable; public interface CreateTopicPolicy extends Configurable, AutoCloseable { void validate(TopicDetails topicDetails) throws InvalidRequestException; } public class TopicDetails { private final String topic; private final int numPartitions; private final short replicationFactor; private final Map<Integer, List<Integer>> replicasAssignments; private final Map<String, String> configs; /** replicasAssignment is a map from partition id to broker ids */ public TopicDetails(String topic, int numPartitions, short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { this.topic = topic; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.replicasAssignments = replicasAssignments; this.configs = configs; } }
Users will have to ensure that the policy implementation code is in the broker's classpath. The constructor of TopicDetails is public to make testing convenient for users. Under normal circumstances, it should only be instantiated by Kafka. We chose to create separate API classes instead of reusing request classes to make it easier to evolve the latter.
The fact that topic creation can now fail due to custom policies raises a couple of new requirements at the protocol level:
- We need to be able to send error messages back to the client, so we introduce an
error_message
string field in thetopic_errors
array of theCreateTopics
response. - For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a
validateOnly
boolean field in theCreateTopics
request to enable this mode.
Both request and response versions are bumped to 1.
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnly (new) create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32 validateOnly => BOOLEAN (new) CreateTopics Response (Version: 1) => [topic_errors] topic_errors => topic error_code error_message (new) topic => STRING error_code => INT16 error_message => NULLABLE_STRING (new)
Proposed Changes
AdminManager will instantiate a CreateTopicPolicy
instance if create.topics.policy.class.name
is defined. When a create topics request is received, it will process each topic in sequence. For each topic:
- It will first perform the existing hardcoded request parameters validation (
numPartitions
andreplicationFactor
cannot be used at the same time asreplicaAssignments)
followed byCreateTopicPolicy.
validate
(if defined). - If validation fails, the
INVALID_REQUEST
error code and error message will be added to the response (for this topic). Note that the error message will only be included if the request version is greater than or equal to 1. - If validation succeeds and
validateOnly
is true, we return the NONE error code for the topic without attempting topic creation. IfvalidateOnly
is false, topic creation will be attempted as usual.
Note that validation failure only affects the relevant topic, other topics in the request will still be processed. Also, it's worth mentioning that validateOnly
doesn't guarantee that topic creation will succeed. Errors could still occur during the actual creation process.
As described in the previous section, we are proposing one policy config/interface per supported request type. The main advantage is that we can add additional configs in a compatible manner, but it also allows for modular policy implementations. Implementors also have the option of using a single implementation class if they wish, but multiple configs would still have to be set.
Compatibility, Deprecation, and Migration Plan
There is no compatibility impact as there is no change in behaviour unless the new config is used.
Future Work
- As we add new policy interfaces, it could make sense to introduce an
admin.policy.class.name
config so that one could provide a single implementation for multiple policies without having to set multiple configs. Not clear if the cost is worth the benefit, however. - Classloader isolation: it may make sense to load user supplied implementations in a separate classloader to avoid dependency version clashes. This is currently being explored in the context of Connect and it may make sense to extend it to all user supplied code in the broker (including Authorizers).
Rejected Alternatives
- A single config for an implementation of an interface with multiple
validate
methods: there would be no way to add methods without breaking binary compatibility until we move to Java 8. - A single config for an implementation of an abstract class with multiple
validate
methods, each with an empty implementation by default: this makes it easier to evolve when compared to rejected alternative 1, but provides less flexibility/modularity to implementors. - Restrict it via configs instead of pluggable interfaces: it would be clunky to provide configs for every requirement that users may have.
- Extending Authorizer to perform validation: even though this could work, it's not particularly intuitive.