Current state: Under Discussion

Discussion thread


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).



The KRaft controller was designed to be isolated from Kafka clients. This isolation helps prevent misbehaving clients from compromising the performance of the system. It also clarifies node roles: brokers are responsible for client traffic. However, there are certain edge cases where it is reasonable for clients to communicate with KRaft controllers.

Sometimes, we would like to target controllers directly. Typically this is so that we can perform an administrative operation without involving the brokers. DESCRIBE_QUORUM is a great example. This operation has nothing to do with the brokers, and may indeed be useful for debugging when other parts of the system are down. Another good example is using INCREMENTAL_ALTER_CONFIGS to make log4j level changes on a KRaft controller.

Proposed Changes

New Kafka clients which support KIP-919 will be able to target KRaft controllers directly. This only applies to admin clients, since it is not possible to produce or consume from a quorum controller.

Public Interfaces

bootstrap.controllers configuration

There will be a new AdminClient  configuration, bootstrap.controllers . This configuration contains a comma-separated series of hostname:port entries. When this configuration is specified, the AdminClient will talk directly to the controller quorum and the brokers will not be involved.

KafkaProducer and KafkaConsumer will not support bootstrap.controllers. Only AdminClient  will support it.

It is an error to set both bootstrap.controllers  and bootstrap.servers . Only one can be set at a time. It is also an error to include broker endpoints in --bootstrap-controllers . If we contact a broker via this mechanism, the command will fail.

Just as with bootstrap.servers, the supplied server list doesn't need to be exhaustive. As long as we can contact one of the provided controllers the RPC can proceed. 

Command-line Changes

New Arguments

The following command-line tools will get a new --bootstrap-controllers argument:



When the --bootstrap-controllers  argument is used --bootstrap-servers must not be specified.

The --bootstrap-controllers  flag will set the bootstrap.controllers configuration described above. It will also clear the bootstrap.servers configuration if that has been set in some other way (for example, via a configuration file provided to the command-line tool).

The --bootstrap-controllers flag will be documented as follows:

   --bootstrap-controllers CONTROLLERS
                         A comma-separted list of bootstrap.controllers that can be supplied instead of boostrap-servers.
                         This is useful for administrators who wish to bypass the brokers.

Changes to

The metadata shell will now have these arguments:

The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --directory DIRECTORY, -d DIRECTORY
                         The __cluster_metadata-0 directory to read.
  --bootstrap-controllers CONTROLLERS, -q CONTROLLERS
                         The bootstrap.controllers, used to communicate directly with the metadata quorum.
  --config CONFIG        Path to a property file containing a Kafka configuration

Note that:

  • The --snapshot  argument has been replaced by a --directory  argument that reads the whole directory, not just a snapshot file
  • There is no need for a --cluster-id  flag, since we will query the controller for its cluster ID prior to creating the Raft client.
  • There is now a --config  argument which can be used to pass a configuration file.

Since  is at an "evolving" level of interface stability, these changes should be OK to make without a deprecation period.

MetadataRequest Changes

There will be a new version of MetadataRequest. It will contain an additional field, DirectToKRaftControllerQuorum . This field defaults to false.

diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index 5da95cfed6..8e5e765d11 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-12",
+  "validVersions": "0-13",
   "flexibleVersions": "9+",
   "fields": [
     // In version 0, an empty array indicates "request metadata for all topics."  In version 1 and
@@ -50,6 +50,8 @@
     { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10",
       "about": "Whether to include cluster authorized operations." },
     { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
-      "about": "Whether to include topic authorized operations." }
+      "about": "Whether to include topic authorized operations." },
+    { "name": "DirectToKRaftControllerQuorum", "type": "bool", "versions": "13+",
+      "about": "Whether to target the KRaft controller quorum." }
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json
index 928d905152..085c0d919f 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -42,7 +42,7 @@
   // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
   // by the DescribeCluster API (KIP-700).
   // Version 12 supports topicId.
-  "validVersions": "0-12",
+  "validVersions": "0-13",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
@@ -94,6 +94,8 @@
         "about": "32-bit bitfield to represent authorized operations for this topic." }
     { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
-      "about": "32-bit bitfield to represent authorized operations for this cluster." }
+      "about": "32-bit bitfield to represent authorized operations for this cluster." },
+    { "name": "FromKRaftController", "type": "bool", "versions": "13+", "default": "false",
+      "taggedVersions": "13+", "tag": 0, "about": "Whether the response was sent back from a KRaft controller." }

DirectToKRaftControllerQuorum Handling Matrix

DirectToKRaftControllerQuorumIf Received by BrokerIf Received by KRaft Controller
falsetraditional broker MetadataResponseUNSUPPORTED_VERSION MetadataResponse
trueINVALID_REQUEST MetadataResponsecontroller MetadataResponse described below

KRaft Controller MetadataRequest

When sending a METADATA request to the controller, the following request fields must be set to false:

  • AllowAutoTopicCreation
  • IncludeClusterAuthorizedOperations
  • IncludeTopicAuthorizedOperations

If any of them are set to true, an INVALID_REQUEST MetadataResponse will be returned.

The controller will return information about the specified topics, or about all topics if requested.

The client making the request must have DESCRIBE permission on the CLUSTER resource.

KRaft Controller MetadataResponse

The KRaft controller MetadataResponse will always set:

  • FromKRaftController to true
  • ClusterId to the Kafka cluster ID
  • The ControllerId to the true Raft leader ID

Response typeTopics Section"Brokers" SectionComments
Successful response from KRaft controllerThe topic data, if any was requested.Controller endpoint information as given in controller.quorum.voters The "direct to controller" case.
Error response from if topics were given in requestthe given topics, with the expected error codeEmpty
Error response if no topics were given in requestthe __cluster_metadata topic with the expected error codeEmptyThere is no top-level error code in MetadataResponse, so we use the __cluster_metadata topic to send back our error.

AdminClient Implementation Notes

When talking to the controllers directly, the AdminClient needs to always send its RPCs to the active controller.

There is one exception: configuring ephemeral log4j settings with incrementalAlterConfigs must be done by sending them to the specified controller node.

Compatibility, Deprecation, and Migration Plan

From a security and isolation point of view, we continue to recommend putting KRaft controllers on a separate network from Kafka clients. This is always a good idea in order to have the best security and isolation.

From a competiability point of view, there should be no impact since current controllers don't handle MetadataRequest.

Only new AdminClient implementations with KIP-919 will be able to take advantage of this feature. When attempting to talk to controllers without KIP-919 support, the calls will complete with UnsupportedVersionException, reflecting the fact that the controllers don't support KIP-919.

Rejected Alternatives

bootstrap.controllers versus configuration

Rather than having a bootstrap.controllers  configuration, we could have a separate configuration like  and put the controller servers into bootstrap.servers . Similarly, we could reuse --bootstrap-server erather than adding --bootstrap-controllers.

We decided to go with the scheme proposed above to make it clearer when a tool was going directly to the controller. This also makes it clearer which command-line tools have this capability and which do not.

For example,  does not have the capability to go direct to the controller, since the controller does not handle produces. Therefore, it's intuitive that lacks the --bootstrap-controllers flag.

Another issue is that in the future, we may want to support using the controllers as bootstrap servers for the brokers. The scheme above leaves the door open for this, whereas a scheme that reused existing configurations would not. 

Future Work

In the future, we might want to allow the controllers to be used as bootstrap servers for the brokers. This would be helpful, for example, in cases where a plugin running on the controller itself wanted to create a consumer or producer, without hard-coding broker addresses in the configuration.

This is a separate use-case from the direct-to-controller one, so probably needs different configuration.

One major problem is what broker endpoints to return. Perhaps we could always return the inter-broker endpoints in the response. However, it's unclear how the client should proceed when the controller has different security settings than the selected broker endpoints. This might require more complex client configuration.

  • No labels