Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread

JIRA:  

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15230

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15369

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Background

The KRaft controller was designed to be isolated from Kafka clients. This isolation helps prevent misbehaving clients from compromising the performance of the system. It also clarifies node roles: brokers are responsible for client traffic. However, there are certain edge cases where it is reasonable for clients to communicate with KRaft controllers.

Controllers as Bootstrap Servers

In some casesSometimes, we would like to use the controller quorum in place of "bootstrap servers." While this is not recommended for most clients, there are certain Kafka clients for whom this might make sense. For example, a metrics plugin running on the controller itself may use a KafkaProducer to publish its records. It would be very helpful if it could use the controller on which it was running as a bootstrap server. This would avoid the need to supply broker hostnames and ports through a plugin configuration.

Controllers as Targets

Sometimes, we would like to target controllers directly. Typically this is so that we can perform an administrative operation without involving the brokers. DESCRIBE_QUORUM is a great example. This operation has nothing to do with the brokers, and may indeed be useful for debugging when other parts of the system are down. Another good example is using INCREMENTAL_ALTER_CONFIGS to make log4j level changes on a KRaft controller.

Proposed Changes

Overview

Controllers as Bootstrap Servers

New Kafka clients which support KIP-919 will be able to use KRaft controllers as bootstrap servers. This applies to all of the client types: consumers, producers, and admin clients. When using controllers as bootstrap servers, the broker endpoints that are returned will be those of the configured inter-broker listener.

It's worth noting here that we continue to recommend putting KRaft controllers on a separate network from Kafka clients. This is always a good idea in order to have the best security and isolation. Therefore, in a well-run cluster, only certain internal clients should use the KRaft controllers as bootstrap servers.

Controllers as Targets

New Kafka clients which support KIP-919 will also be able to target KRaft controllers directly. This only applies to admin clients, since it is not possible to produce or consume from a quorum controller. In this mode, the endpoints that are returned are the appropriate ones for the controller listener that was contacted.

Public Interfaces

Configuration

Controllers as Bootstrap Servers

No new client-side configuration will be required to use controllers as bootstrap servers.

Controllers as Targets

There will be a new AdminClient  configuration, bootstrap.controllers . This configuration contains a comma-separated series of entries in the following form:

Code Block
[controller-id]@[hostname]:[port]

This format is the same as that of controller.quorum.voters . Indeed, the contents of that configuration can be copied to this one if desired.

It is an error to set both bootstrap.controllers  and bootstrap.servers . Only one can be set at a time.

When this configuration is specified, the AdminClient will talk directly to the controller quorum and the brokers will not be involved.

KafkaProducer  and KafkaConsumer  will not support bootstrap.controllers .

Command-line Changes

New Arguments

The following command-line tools will get a new --bootstrap-controllers argument:

  • kafka-acls.sh
  • kafka-cluster.sh
  • kafka-configs.sh
  • kafka-delegation-tokens.sh
  • kafka-features.sh
  • kafka-metadata-quorum.sh
  • kafka-metadata-shell.sh

  • kafka-reassign-partitions.sh

When the --bootstrap-controllers  argument is used:

  • --bootstrap-servers must not be specified
  • The tool will only communicate with the controller quorum.

Changes to kafka-metadata-shell.sh

The metadata shell will now have these arguments:

Code Block
The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --directory DIRECTORY, -d DIRECTORY
                         The __cluster_metadata-0 directory to read.
  --bootstrap-controllers CONTROLLERS, -q CONTROLLERS
                         The bootstrap.controllers, used to communicate directly with the metadata quorum.
  --config CONFIG        Path to a property file containing a Kafka configuration

Note that:

  • The --snapshot  argument has been replaced by a --directory  argument that reads the whole directory, not just a snapshot file
  • There is no need for a --cluster-id  flag, since we will query the controller for its cluster ID prior to creating the Raft client.
  • There is now a --config  argument which can be used to pass a configuration file.

Since kafka-metadata-shell.sh  is at an "evolving" level of interface stability, these changes should be OK to make without a deprecation period.

New Error Codes

...

target controllers directly. Typically this is so that we can perform an administrative operation without involving the brokers. DESCRIBE_QUORUM is a great example. This operation has nothing to do with the brokers, and may indeed be useful for debugging when other parts of the system are down. Another good example is using INCREMENTAL_ALTER_CONFIGS to make log4j level changes on a KRaft controller.

Overview

AdminClient

New Kafka admin clients which support KIP-919 will be able to target KRaft controllers directly. Because we don't support producing or consuming on the controllers, this only applies to admin clients, and not to producers or consumers.

The choice to communicate directly with the kcontroller quorum should not be taken lightly. In some cases, this bypasses safety checks built into the system. For example, when using an incrementalAlterConfigs  operation to alter a broker configuration, normally the configuration change will be validated on the broker side first. However, when we send the RPC to the controller quorum directly, this validation step does not occur, since the broker is not involved.

In order to make it clear what is going on, communicating directly with the controller quorum will require special configurations and command-line flags. If these are not passed, the communication will not be allowed. Similarly, we will not be able to talk to brokers when using these configurations and flags.

Controller Registration

Once after starting up, Kafka controllers will register themselves with the active controller. This registration will include information about the endpoints which they possess, as well as information about whether they are ready to perform zk migration, and a randomly generated UUID uniquely identifying the specific incarnation of the controller. The registration will be persisted in the metadata log as hard state.

Each controller will check that the controller registration for its ID is as expected. If it is not, it will re-register. It examines the registration information found in the metadata log. This check is necessary to fix cases where a delayed message from an older incarnation of controller N somehow arrives later than a more recent registration for controller N. Although we expect this to be quite rare, it is possible.

Security

From the perspective of security, nothing is changing. It is already possible for custom clients to send operations directly to the controller. The only new ability we are adding is the ability to get back metadata responses. And the information they contain is already present in the MetadataResponses returned from the brokers. 

We will continue to enforce access controls on all operations performed by the controller, by means of the Authorizer system. We also continue to recommend that users not be given direct network access to kcontrollers – for example, by putting them on a separate network, or by setting up a firewall.

Public Interfaces

bootstrap.controllers configuration

There will be a new AdminClient  configuration, bootstrap.controllers . This configuration contains a comma-separated series of hostname:port entries. When this configuration is specified, the AdminClient will talk directly to the controller quorum and the brokers will not be involved.

KafkaProducer and KafkaConsumer will not support bootstrap.controllers. Only AdminClient  will support it.

It is an error to set both bootstrap.controllers  and bootstrap.servers . Only one can be set at a time. It is also an error to include broker endpoints in --bootstrap-controller . If we contact a broker via this mechanism, the command will fail.

Just as with bootstrap.servers, the supplied server list doesn't need to be exhaustive. As long as we can contact one of the provided controllers the RPC can proceed. 

Command-line Changes

New Arguments

The following command-line tools will get a new --bootstrap-controller argument:

  • kafka-acls.sh
  • kafka-cluster.sh
  • kafka-configs.sh
  • kafka-delegation-tokens.sh
  • kafka-features.sh
  • kafka-metadata-quorum.sh
  • kafka-metadata-shell.sh

  • kafka-reassign-partitions.sh

When the --bootstrap-controller argument is used --bootstrap-servers must not be specified.

The --bootstrap-controller  flag will set the bootstrap.controllers configuration described above. It will also clear the bootstrap.servers configuration if that has been set in some other way (for example, via a configuration file provided to the command-line tool).

The --bootstrap-controller flag will be documented as follows:

Code Block
   --bootstrap-controller CONTROLLERS
                         A comma-separted list of bootstrap.controllers that can be supplied instead of bootstrap-servers.
                         This is useful for administrators who wish to bypass the brokers.

Note that it is not necessary to specify the controller IDs when using --bootstrap-controller .

Here is an example --bootstrap-controller  usage:

Code Block
./bin/kafka-cluster.sh cluster-id --bootstrap-controller example.com:9090,example2.com:9090,example3.com:9090

Changes to kafka-metadata-shell.sh

The metadata shell will now have these arguments:

Code Block
The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --directory DIRECTORY, -d DIRECTORY
                         The __cluster_metadata-0 directory to read.
  --bootstrap-controller CONTROLLERS, -q CONTROLLERS
                         The bootstrap.controllers, used to communicate directly with the metadata quorum.
  --config CONFIG        Path to a property file containing a Kafka configuration

Note that:

  • The --snapshot  argument has been replaced by a --directory  argument that reads the whole directory, not just a snapshot file
  • There is no need for a --cluster-id  flag, since we will query the controller for its cluster ID prior to creating the Raft client.
  • There is now a --config  argument which can be used to pass a configuration file.

Since kafka-metadata-shell.sh  is at an "evolving" level of interface stability, these changes should be OK to make without a deprecation period.

New Errors

There will be a new MISMATCHED_ENDPOINT_TYPE error.

Code Block
MISMIATCHED_ENDPOINT_TYPE([next], "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new),

There will also be a new UNSUPPORTED_ENDPOINT_TYPE error.

Code Block
UNSUPPORTED_ENDPOINT_TYPE([next], "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new),

There will also be a new UNKNOWN_CONTROLLER_ID error.

Code Block
UNKNOWN_CONTROLLER_ID([next], "This controller ID is not known.", UnknownControllerIdException::new),

DescribeCluster Changes

 When bootstrap.controller  is set, the AdminClient will use DescribeClusterRequest  rather than MetadataRequest  to obtain the cluster toplolgy.

We will add a new EndpointType  field to DescribeClusterRequest . It will be set to 2 (controllers) when bootstrap.controller  is in use. Otherwise, it will be set to 1 (brokers).

If the provided endpoint type does not match the actual endpoint type, we will return the MISMATCHED_ENDPOINT_TYPE  error. So, for example, sending a request to a broker with an EndpointType  of 2 (controller) will result in this error.

Code Block
diff --git a/clients/src/main/resources/common/message/DescribeClusterRequest.json b/clients/src/main/resources/common/message/DescribeClusterRequest.json
index 192e4d87d44..d1a4f432533 100644
--- a/clients/src/main/resources/common/message/DescribeClusterRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json
@@ -18,10 +18,12 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "DescribeClusterRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
 

MetadataRequest Changes

There will be a new version of MetadataRequest. It will contain an additional field, TargetKRaftControllerQuorum. This field defaults to false.

...

Ignore unknown record types

This KIP proposes to ignore unknown record keys which allows the downgraded coordinator to proceed with loading the rest of the partition. As we cannot write tombstones for unknown keys, these records will be stored in the logs until the coordinators are upgraded. However, this KIP prioritizes the simplicity of ignoring them because we expect downgrades to be non-permanent.

Bump non-flexible Value record types to flexible versions

In this KIP we propose to bump each of these records to a flexible version and backport this to earlier releases, specifically 3.0.3, 3.1.3, 3.2.4, 3.3.3, and 3.4.1 (note: we will need commitment from release managers to perform the minor releases). We will also apply this to 3.5 if the patch is merged to trunk before the code freeze date. Backporting this patch to earlier releases is acceptable because the change is small and is very low risk. One limitation is that we will be unable to downgrade to a version lower than 3.X. We will mention in future release notes where a new record type or a new field is introduced that we can only downgrade to the aforementioned versions. Note that once a new tagged field is introduced in a later version, that version can never be downgraded to below the listed versions.

We will only deserialize with the flexible version but serialize with the highest known non-flexible version. This is because users upgrading to one of these versions may want to downgrade but if we serialize with the flexible version they won't be able to downgrade back to an earlier version. Deserializing a flexible version to a non-flexible version will fail.

We will rely on tagged fields (introduced in KIP-482) which allows additions and deletions of new fields without needing to bump versions. Once a version is flexible, deserializing tagged fields is straightforward as they are automatically ignored. We do not touch Key record types because keys are considered fixed and optional fields in keys do not make much sense.

This KIP opens the door to backward compatible changes to the record schemas through the use of tagged fields. Any future tagged fields will not require a version bump and older brokers can simply ignore the tagged fields they do not understand. Note that introducing a new non-tagged field or removing an existing non-tagged field in the future will not be backward compatible.

Compatible changes

Compatible changes are straightforward; the added tagged field is truly optional and is not required for the correctness of the coordinator. These fields will be ignored by the downgraded coordinator.

Incompatible changes

There are two cases: 1) we introduce a tagged field that is required for correctness, i.e. a new field that enforces correct access and without it results in incorrect coordinator behavior / data loss or 2) a non-tagged field is added. For both incompatible changes, we propose to have a version bump (already required for non-tagged fields) and add a new isBackwardCompatible field to the MetadataVersion enum so that the operator, if they decide, can downgrade with the --force option knowing that the downgrade is not backward compatible. 

Public Interfaces

__consumer_offsets

GroupMetadataValue.json

Bump to flexible version

Code Block
linenumberstrue
// KIP-915: bumping the version will no longer make this record backward compatible.
{
  "type": "data",
  "name": "GroupMetadataValue",
  "validVersions": "0-4",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "protocolType", "versions": "0+", "type": "string"},
    { "name": "generation", "versions": "0+", "type": "int32" },
    { "name": "protocolIncludeClusterAuthorizedOperations", "versions": "0+", "type": "stringbool", "nullableVersionsversions": "0+" },
-     { "nameabout": "leader", "versions": "0+", "type": "string", "nullableVersions": "0+Whether to include cluster authorized operations." }
+      "about": "Whether to include cluster authorized operations." },
+    { "name": "currentStateTimestampEndpointType", "versionstype": "2+int8", "typeversions": "int641+", "default": -"1, "ignorable": true},
+    {  "nameabout": "members", "versions": "0+", "type": "[]MemberMetadata" }
  ],
  "commonStructs": [The endpoint type to describe. 1=brokers, 2=controllers." }
    {]
 }

There will be a corresponding new version of DescribeClusterResponse .

Code Block
diff     "name": "MemberMetadata",
      "versions": "0-4",
      "fields": [
        { --git a/clients/src/main/resources/common/message/DescribeClusterResponse.json b/clients/src/main/resources/common/message/DescribeClusterResponse.json
index 1cd26c3d3c1..d774137d080 100644
--- a/clients/src/main/resources/common/message/DescribeClusterResponse.json
+++ b/clients/src/main/resources/common/message/DescribeClusterResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 60,
   "type": "response",
   "name": "memberIdDescribeClusterResponse",
-  "versionsvalidVersions": "0+",
+  "typevalidVersions": "string0-1" },
        { "name"flexibleVersions": "groupInstanceId0+",
   "versionsfields": "3+", "type [
     { "name": "stringThrottleTimeMs", "defaulttype": "nullint32", "nullableVersionsversions": "30+", "ignorable": true},
 
@@ -26,20 +26,22 @@
       { "nameabout": "clientId", "versions": "0+", "type": "string"The top-level error code, or 0 if there was no error" },
        { "name": "clientHostErrorMessage", "versions": "0+", "type": "string" },
        { "name"versions": "rebalanceTimeout0+", "versionsnullableVersions": "10+", "typedefault": "int32null",
 "ignorable      "about": true},
    "The top-level error message, or null if there was no error." },
+    { "name": "EndpointType", "sessionTimeouttype": "int8", "versions": "01+", "typedefault": "int321" },
+        { "nameabout": "subscription", "versions": "0+", "type": "bytes" },
        { "name": "assignment", "versions": "0+", "type": "bytes" }
      ]
    }
  ]
}

OffsetCommitValue.json

KIP-848 bumps to a flexible version 4 and adds the topicId field. This KIP proposes to solely bump to flexible version and for KIP-848 to add topicId as a tagged field instead.

The endpoint type. 1=brokers, 2=controllers." },

The EndpointType field will still be populated when there is an error, such as a MISMATCHED_ENDPOINT_TYPE  error.

When EndpointType  is "controllers", the ControllerId field will be set to the ID of the current active kcontroller, or -1 if there is no current active kcontroller.

If the MetadataVersion is too old to support controller registrations, and EndpointType was passed as "controllers," the controller will return UNSUPPORTED_ENDPOINT_TYPE . This reflects the fact that it doesn't have metadata about the controller endpoints in these older MetadataVersions.

ControllerRegistrationRequest / Response

There will be a new ControllerRegistrationRequest. All controllers will send this to the active controller.

Code Block
{
  "apiKey": ...,
Code Block
linenumberstrue
// KIP-915: bumping the version will no longer make this record backward compatible.
{
  "type": "datarequest",  
  "name": "OffsetCommitValueControllerRegistrationRequest",  
  "validVersions": "0-4",  
  "flexibleVersions": "40+",  
  "fields": [
    { "name": "offsetControllerId", "type": "int64int32", "versions": "0+" },    ,
      "about": "The ID of the controller to register." },
    { "name": "leaderEpochActiveControllerEpoch", "type": "int32", "versions": "30+",
      "defaultabout": -1, "ignorable": true},    "The epoch of the current active controller." },
    { "name": "metadataIncarnationId", "type": "stringuuid", "versions": "0+",
 },    { "nameabout": "commitTimestamp", "type": "int64", "versions": "0+" },    The controller incarnation ID, which is unique to each process run." },
    { "name": "expireTimestampZkMigrationReady", "type": "int64bool", "versions": "10+",
      "defaultabout": -1, "ignorable": true}
  ]
}

__transaction_state

TransactionLogValue.json

Bump to flexible version

Code Block
linenumberstrue
// KIP-915: bumping the version will no longer make this record backward compatible.
{
  "type": "data",
  "name": "TransactionLogValue",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
 "Set if the required configurations for ZK migration are present." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this controller", "versions": "0+", "fields": [
      { "name": "ProducerIdName", "type": "int64string", "versions": "0+", "mapKey": true,
        "about": "ProducerThe id in use byname of the transactionalendpoint." id"},
      { "name": "ProducerEpochHost", "type": "int16string", "versions": "0+",
        "about": "Epoch associated with the producer id"The hostname." },
      { "name": "TransactionTimeoutMsPort", "type": "int32uint16", "versions": "0+",
        "about": "TransactionThe timeout in milliseconds"port." },
      { "name": "TransactionStatusSecurityProtocol", "type": "int8int16", "versions": "0+",
        "about": "The security "TransactionState the transaction is in"protocol." }
    ]
    },
    { "name": "TransactionPartitionsFeatures", "type": "[]PartitionsSchemaFeature",
      "versionsabout": "0+The features on this controller", "nullableVersionsversions": "0+",
      "about": "Set of partitions involved in the transaction", "fields":  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "PartitionIdsMinSupportedVersion", "type": "[]int32int16", "versions": "0+"}]},
     { "name": "TransactionLastUpdateTimestampMs", "typeabout": "int64", "versions": "0+",
      "about": "Time the transaction was last updated"The minimum supported feature level." },
      { "name": "TransactionStartTimestampMsMaxSupportedVersion", "type": "int64int16", "versions": "0+",
        "about": "Time the transaction was started"}
  ]
}

Compatibility, Deprecation, and Migration Plan

The compatibility plan is explored in proposed changes. We will backport this to all minor 3.X versions: 3.0.3, 3.1.3, 3.2.4, 3.3.3, and 3.4.1 . Downgrades to lower versions will be incompatible and will be explicitly stated in future release notes when new fields/records are introduced.

Test Plan

Rejected Alternatives

Rejected Alternative: upgraded coordinator deletes new and downgrades existing record types

Instead of the downgraded coordinator deleting the new record types when loading the partition, we can have the new coordinator delete the new record types before shutting down. This is possible with KIP-584 (feature flag) versioning approach: the operator downgrades the coordinator version which triggers coordinators to perform deletions for the new record types. We can ensure that all partitions will be compacted even if a broker is down since the partitions will have migrated to an online broker. Once coordinators append tombstones for the new record types they can explicitly trigger compaction. This introduces additional time spent cleaning up during downgrades. More importantly, coordinators need to downgrade Value records so that the downgraded coordinator can load committed offsets. This means group coordinators need to rewrite all offset commits with the old format, including transactional offset commits. 

Rewriting transactional offset commits complicates the downgrade path:

  • If a transactional offset commit is in progress, we need to abort it before reformatting but we don't have a mechanism in place to trigger a server side abort. Furthermore, we will need to add logic so that the coordinator is notified when a transaction is aborted to proceed with the rewrite.
  • Producers perspective: we would either have to make the rewrite completely invisible to the producer or have the producer retry after aborting it from the server side. Both paths are complex and require additional investigation.
  • Definition of a rewrite: should we consider translating the transaction start time / deadline when rewriting?

We also need a separate logic to downgrade the __transaction_state Value record, TransactionLogValue, but it should be simpler. 

The maximum supported feature level." }
    ]
    }
  ]
}

and a corresponding ControllerRegistrationResponse:

Code Block
{
  "apiKey": ...,
  "type": "response",
  "name": "ControllerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The response error code." },
    { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
      "about": "The response error message, or null if there was no error." }
  ]
}

The active controller will persist all registrations that are sent in with the correct permissions (CLUSTERACTION on CLUSTER). If the controller is not active, we'll send back a NOT_CONTROLLER error.

The active controller may also return STALE_CONTROLLER_EPOCH if the wrong epoch was passed.

ApiVersionsResponse

The ZkMigrationReady field in ApiVersionsResponse is now deprecated, and won't be filled out.

Code Block
diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json b/clients/src/main/resources/common/message/ApiVersionsResponse.json
index 9fda953e10e..eb449f07c54 100644
--- a/clients/src/main/resources/common/message/ApiVersionsResponse.json
+++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json
@@ -70,8 +70,6 @@
           "about": "The cluster-wide finalized min version level for the feature."}
       ]
     },
-    { "name":  "ZkMigrationReady", "type": "bool", "versions": "3+", "taggedVersions": "3+",
-      "tag": 3, "ignorable": true, "default": "false",
-      "about": "Set by a KRaft controller if the required configurations for ZK migration are present" }
+    { "name":  "ZkMigrationReady", "type": "deprecated", "versions": "3+", "taggedVersions": "3+", "tag": 3 }
   ]
 }

RegisterControllerRecord

The data from the registration request will be written to a new RegisterControllerRecord.

Code Block
{
  "apiKey": ...,
  "type": "metadata",
  "name": "RegisterControllerRecord",
  "validVersions": "0+",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+",
      "about": "The controller id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the controller process" },
    { "name": "ZkMigrationReady", "type": "boolean", "versions": "0+",
      "about": "Set if the required configurations for ZK migration are present." },
    { "name": "EndPoints", "type": "[]ControllerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this controller.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    ]},
    { "name": "Features", "type": "[]ControllerFeature",
      "about": "The features on this controller", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]}
  ]
}

At the moment, there will be no incremental version of this record, since there is no equivalent to broker fencing and unfencing on the controller. There is also no epoch – the incarnation ID, chosen by the controller itself, is what we use.

AdminClient Changes

As described above, AdminClient will now support bootstrap.controller  in addition to bootstrap.server . The following APIs will be supported:

APINotes
alterConfigs
createAcls
deleteAcls
describeCluster
describeAcls
describeConfigs
describeClientQuotas
alterClientQuotas
incrementalAlterConfigsThis can be used to alter log4j settings on inactive controllers. That is the one case where we don't send the RPC to the active controller.
describeDelegationToken
electLeaders
alterPartitionReassignments
listPartitionReassignments
describeClientQuotas
describeUserScramCredentials
describeFeatures
updateFeatures
describeMetadataQuorum
unregisterBrokerthis affects brokers, not controllers

When an unsupported API is used with bootstrap.controller , UnsupportedEndpointType  is returned as an error.

Controller Changes

Fix compatibility gates

Previously, the controller used ApiVersionsResponse messages obtained from the other controllers to determine:

  • whether all controllers in the cluster supported a proposed change to the MetadataVersion 
  • During a ZK migration, whether all controllers in the cluster were ready to exit premigration

This method was flawed, because it relied on the idea that controllers were always connected to each other. In reality, after the Raft election concludes, the only communication that takes place is inactive controllers connecting to the active controller. The other connections are gradually closed, and we lose information about those other nodes.

With the introduction of controller registrations, the active controller will simply use this information instead. This will be more accurate than the old system.

New APIs

The following RPCs will now be supported on the controller:

APINotes
DESCRIBE_CONFIGSEven on inactive controllers, this can be used to alter the log4j settings dynamically.
DESCRIBE_CLUSTERAs described above, DESCRIBE_CLUSTER will be used by AdminClient for bootstrapping.
REGISTER_CONTROLLER

Compatibility, Deprecation, and Migration Plan

Client Compatibility Matrix


pre-KIP-919 brokerpost-KIP-919 brokerpre-KIP-919 controllerpost KIP-919 controller
bootstrap.serversuccesssuccess

Fails because METADATA request is not supported.

Fails because METADATA request is not supported.

bootstrap.controllerFails because DESCRIBE_CLUSTER returns MISMATCHED_ENDPOINTFails because DESCRIBE_CLUSTER v1 is not supported.Fails because DESCRIBE_CLUSTER v1 is not supported.

If the metadata version is too old to support controller registrations, UNSUPPORTED_ENDPOINT_TYPE.

Otherwise, success.

Controller Registration MetadataVersion Gate

Controller registrations will be gated behind a new metadata version. The controllers will not attempt to register themselves if the current metadata version is too old.

Rejected Alternatives

bootstrap.controllers versus direct.to.controller configuration

Rather than having a bootstrap.controllers  configuration, we could have a separate configuration like direct.to.controller  and put the controller servers into bootstrap.servers . Similarly, we could reuse --bootstrap-server erather than adding --bootstrap-controller.

We decided to go with the scheme proposed above to make it clearer when a tool was going directly to the controller. This also makes it clearer which command-line tools have this capability and which do not.

For example, kafka-console-consumer.sh  does not have the capability to go direct to the controller, since the controller does not handle produces. Therefore, it's intuitive that kafka-console-consumer.sh lacks the --bootstrap-controller flag.

Another issue is that in the future, we may want to support using the controllers as bootstrap servers for the brokers. The scheme above leaves the door open for this, whereas a scheme that reused existing configurations would not. 

Extending MetadataRequest instead of using DescribeClusterRequest

Instead of extending DescribeClusterRequest, we could have extended MetadataRequest so that the AdminClient could send that to the controllers. However, MetadataRequest doesn't return a top-level error code. So we cannot cleanly handle the many compatibility scenarios described above. We would have no choice but to simply terminate the TCP connection, which would leave the user guessing what the problem was.

Integration with Raft

When we implement dynamic KRaft quorum reconfiguration, we will want to store information about Raft voters in the Raft log itself. This will allow the quorum to change and grow beyond the initial static configuration. So it's reasonable to ask if there is some overlap between the reconfiguration project and this one.

However, I think these two KIPs should be separate. The registration information QuorumController cares about is different than what Raft will care about. For example, Raft will not care about the currently active features, or whether ZK migration is ready.

Future Work

Bootstrap-to-broker

In the future, we might want to allow the controllers to be used as bootstrap servers for the brokers. This would be helpful, for example, in cases where a plugin running on the controller itself wanted to create a consumer or producer, without hard-coding broker addresses in the configuration.

This is a separate use-case from the direct-to-controller one, so probably needs different configuration.

One major problem is what broker endpoints to return. Perhaps we could always return the inter-broker endpoints in the response. However, it's unclear how the client should proceed when the controller has different security settings than the selected broker endpoints. This might require more complex client configuration.

Better API for returning topic data

MetadataRequest has many limitations.

  • It unconditionally returns broker information, even when we don't care about that (such as in AdminClient)
  • Its response does not incorporate a top-level error code, so there is no reasonable way to indicate failure
  • All requested topics are returned in one big lump, which makes the garbage collector choke in larger clusters

In the future, we should have a better API for listing topic data, that does not have these problems. This would also give us a clean way to implement Admin#listTopics  when using bootstrap.controller The benefit of this approach is that future record types are deleted. The proposed approach to ignore new records only works because the coordinator deletes new record types when a group is converted from new to old. However, we may introduce new record types that are not deleted during this conversion. Another benefit is that there are no strict requirements for Value records. We don't have to only add taggedFields (which this KIP requires) since these records will be rewritten anyways. Having the upgraded coordinator explicitly rewrite new record types and downgrade is future proof and there are no version downgrade barriers like we do for the proposed design .