Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Changes to Wire Protocol:
    • Adds the following new Request/Response messages:
    • Modifies Metadata Request/Response to allowing polling for in-progress or complete admin operations. Added fields include:
      • Add the ability to request no topics with a null topics list
      • Boolean indicating if a topic is marked for deletion
      • Boolean indicating if a topic is an internal topic
      • Rack information (if not added by KIP-36 Rack aware replica assignment)
      • Boolean indicating if a broker is the controller

...

 

CreateTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

CreateTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Response semantics:

  1. When a request hits the timeout, the topics that are not "complete" will have the TimeoutException error code. 
    • The topics that did complete successfully with have no error. 
Delete Topics Request (KAFKA-2946): (Voted and Planned for 0.10.1.0)

 

DeleteTopics Request (Version: 0) => [topics] timeout 
  topics => STRING
  timeout => INT32

DeleteTopicsRequest is a batch request to initiate topic deletion.

Request semantics:

  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same topic in one request the extra request will be ingnored
    • This is because the list of topics is modeled server side as a set
    • Multiple deletes results in the same end goal, so handling this error for the user should be okay
  3. When requesting to delete a topic that does not exist, a an InvalidTopic error will be returned for that topic.
  4. When requesting to delete a topic that is already marked for deletion, the request will wait up to the timeout until the delete is "complete" and return as usual.
    • This is to avoid errors due to concurrent delete requests. The end result is the same, the topic is deleted.
  5. The principal must be authorized to the "Delete" Operation on the "Topic" resource to delete the topic. 
    • Unauthorized requests will receive a TopicAuthorizationException if they are authorized to the "Describe" Operation on the "Topic" resource
    • Otherwise they will receive an InvalidTopicException as if the topic does not exist. 
  6. Setting a timeout > 0 will allow the request to block until the delete is "complete" on the controller node.
    • Complete means the local topic metadata cache no longer contains the topic
      • The topic metadata is updated when the controller sends out update metadata requests to the brokers
    • If a timeout error occurs, the topic could still be deleted successfully at a later time. Its up to the client to query for the state at that point.
  7. Setting a timeout <= 0 will validate arguments and trigger the delete topics and return immediately. 
    • This is essentially the fully asynchronous mode we have in the Zookeeper tools today. 
    • The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic deletion was triggered. 
  8. The request is not transactional. 
    1. If an error occurs on one topic, the others could still be deleted.
    2. Errors are reported independently.

QA:

  • Why is DeleteTopicsRequest a batch request?
    • Scenarios where tools or admins want to delete many topics should be able to with fewer requests
    • Example: Removing all cluster topics
  • What happens if some topics error immediately? Will it return immediately? 
    • The request will block until all topics have either been deleted, errors, or the timeout has been hit
    • There is no "short circuiting" where 1 error stops the other topics from being deleted
  • Why have a timeout at all? Deletes could take a while?
    • True some deletes may take a while or never finish, however some admin tools may want extended blocking regardless. 
    • If you don't want any blocking setting a timeout of 0 works.
    • Future changes may make deletes much faster. See the Follow Up Changes section above.
  • Why implement "partial blocking" instead of fully async or fully consistent?
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • See Request Forwarding below
Delete Topics Response

 

DeleteTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

DeleteTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Response semantics:

  1. When a request hits the timeout, the topics that are not "complete" will have the TimeoutException error code. 
    • The topics that did complete successfully with have no error. 
Alter Topics Request
This request/response is a bit more complicated and less critical than some others. Therefore, It will be addressed toward the end of KIP-4.

ACL Admin Schema (KAFKA-3266)

Note: Some of this work/code overlaps with "KIP-50 - Move Authorizer to o.a.k.common package". KIP-4 does not change the Authorizer interface at all, but does provide java objects in "org.apache.kafka.common.security.auth" to be used in the protocol request/response classes. It also provides translations between the Java and Scala versions for server side compatibility with the Authorizer interface.

List ACLs Request

 

ListAcls Request (Version: 0) => principal resource 
principal => NULLABLE_STRING
resource => resource_type resource_name resource_type => INT8 resource_name => STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principal, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principal can list their own ACLs where the permission type is "Allow", Otherwise the principal must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This avoids adding a new operation that an existing authorizer implementation may not be aware of.
    • This can be reviewed and further refined/restricted as a follow up ACLs review after this KIP. See Follow Up Changes.
  5. Requesting a resource or principal that does not have any ACLs will not result in an error, instead empty response list is returned
List ACLs Response

 

ListAcls Response (Version: 0) => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principal acl_permission_type acl_host acl_operation acl_principal => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAcls Request (Version: 0) => [requests] 
requests => resource [actions]
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
actions => action acl
action => INT8
acl => acl_principal acl_permission_type acl_host acl_operation
acl_principal => STRING
acl_permission_type => INT8
acl_host => STRING
acl_operation => INT8
Request semantics:
  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same resource in one request an InvalidRequestException will be logged on the broker and a single error code for that resource will be returned to the client
    • This is because the list of requests is modeled server side as a map with resource as the key
  3. ACLs with a delete action will be processed first and the add action second.
    1. This is to prevent confusion about sort order and final state when a batch message is sent. 
    2. If an add request was processed first, it could be deleted right after.
    3. Grouping ACLs by their action allows batching requests to the authorizer via the Authorizer.addAcls and Authorizer.removeAcls calls.
  4. The request is not transactional. One failure wont stop others from running.
    1. If an error occurs on one action, the others could still be run.
    2. Errors are reported independently.
  5. The principal must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This avoids adding a new operation that an existing authorizer implementation may not be aware of.
    • This can be reviewed and further refined/restricted as a follow up ACLs review after this KIP. See Follow Up Changes.

QA:

  • Why doesn't this request have a timeout and implement any blocking like the CreateTopicsRequest?
    • The Authorizer implementation is synchronous and exposes no details about propagating the ACLs to other nodes. 
    • The best we can do in the existing implementation is call Authorizer.addAcls and Authorizer.removeAcls and hope the underlying implementation handles the rest.
  • What happens if there is an error in the Authorizer?
    • Currently the best we can do is log the error broker side and return a generic exception because there are no "standard" exceptions defined in the Authorizer interface to provide a more clear code
    • KIP-50 is tracking adding the standard exceptions
    • The Authorizer interface also provides no feedback about individual ACLs when added or deleted in a group
      • Authorizer.addAcls is a void function, the best we can do is return an error for all ACLs and let the user check the current state by listing the ACLs
      • Autohrizer.removeAcls is a boolean function,  the best we can do is return an error for all ACLs and let the user check the current state by listing the ACLs
      • Behavior here could vary drastically between implementations
      • I suggest this be addressed in KIP-50 as well, though it has some compatibility concerns. 
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • This ensures one instance of the Authorizer sees all the changes and reduces concurrency issues, especially because the Authorizer interface exposes no details about propagating the ACLs to other nodes. 
    • See Request Forwarding below
Alter ACLs Response

 

AlterAcls Response (Version: 0) => [responses] 
responses => resource [results]
resource => resource_type resource_name
resource_type => INT8
resource_name => STRING
results => action acl error_code
action => INT8
acl => acl_principal acl_permission_type acl_host acl_operation
acl_principal => STRING
acl_permission_type => INT8
acl_host => STRING
acl_operation => INT8
error_code => INT16
 

2. Server-side Admin Request handlers

At the highest level, admin requests will be handled on the brokers the same way that all message types are. However, because admin messages modify cluster metadata they should be handled by the controller. This allows the controller to propagate the changes to the rest of the cluster.  However, because the messages need to be handled by the controller does not necessarily mean they need to be sent directly to the controller. A message forwarding mechanism can be used to forward the message from any broker to the correct broker for handling. 

...