Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread:
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-15369
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
It is an error to set both bootstrap.controllers
and bootstrap.servers
. Only one can be set at a time. It is also an error to include broker endpoints in --bootstrap-controllerscontroller
. If we contact a broker via this mechanism, the command will fail.
...
The following command-line tools will get a new --bootstrap-controllers
argumentcontroller
argument:
- kafka-acls.sh
- kafka-cluster.sh
- kafka-configs.sh
- kafka-delegation-tokens.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
kafka-metadata-shell.sh
- kafka-reassign-partitions.sh
When the --bootstrap-controllers
controller
argument is used --bootstrap-servers
must not be specified.
The --bootstrap-controllerscontroller
flag will set the bootstrap.controllers
configuration described above. It will also clear the bootstrap.servers
configuration if that has been set in some other way (for example, via a configuration file provided to the command-line tool).
The --bootstrap-controllers
flag controller
flag will be documented as follows:
Code Block |
---|
--bootstrap-controllerscontroller CONTROLLERS A comma-separted list of bootstrap.controllers that can be supplied instead of boostrapbootstrap-servers. This is useful for administrators who wish to bypass the brokers. |
Note that it is not necessary to specify the controller IDs when using --bootstrap-controllerscontroller
.
Here is an example --bootstrap-controllerscontroller
usage:
Code Block |
---|
./bin/kafka-cluster.sh cluster-id --bootstrap-controllerscontroller example.com:9090,example2.com:9090,example3.com:9090 |
...
Code Block |
---|
The Apache Kafka metadata tool positional arguments: command The command to run. optional arguments: -h, --help show this help message and exit --directory DIRECTORY, -d DIRECTORY The __cluster_metadata-0 directory to read. --bootstrap-controllerscontroller CONTROLLERS, -q CONTROLLERS The bootstrap.controllers, used to communicate directly with the metadata quorum. --config CONFIG Path to a property file containing a Kafka configuration |
...
Code Block |
---|
UNSUPPORTED_ENDPOINT_TYPE([next], "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), |
There will also be a new UNKNOWN_CONTROLLER_ID error.
Code Block |
---|
UNKNOWN_CONTROLLER_ID([next], "This controller ID is not known.", UnsupportedEndpointTypeUnknownControllerIdException::new), |
DescribeCluster Changes
When bootstrap.controller
is set, the AdminClient will use DescribeClusterResquest
DescribeClusterRequest
rather than MetadataRequest
to obtain the cluster toplolgy.
...
If the MetadataVersion is too old to support controller heartbeatsregistrations, and EndpointType was passed as "controllers," the controller will return UNSUPPORTED_ENDPOINT_TYPE
. This reflects the fact that it doesn't have metadata about the controller endpoints in these older MetadataVersions.
ControllerRegistrationRequest / Response
There will be a new ControllerRegistrationRequest. All controllers will send this to the active controller.
Code Block |
---|
{ "apiKey": ..., "type": "request", "name": "ControllerRegistrationRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "about": "The ID of the controller IDto register." }, { "name": "IncarnationIdActiveControllerEpoch", "type": "uuidint32", "versions": "0+", "about": "The controllerepoch incarnation of the current active controller." }, { "name": "IncarnationId", "type": "uuid", "versions": "0+", "about": "The controller incarnation ID, which is unique to each process run." }, { "name": "ZkMigrationReady", "type": "bool", "versions": "0+", "about": "Set if the required configurations for ZK migration are present." }, { "name": "Listeners", "type": "[]Listener", "about": "The listeners of this controller", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "The port." }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol." } ] }, { "name": "Features", "type": "[]Feature", "about": "The features on this controller", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The feature name." }, { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", "about": "The minimum supported feature level." }, { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+", "about": "The maximum supported feature level." } ] } ] } |
...
Code Block |
---|
{ "apiKey": ..., "type": "response", "name": "ControllerHeartbeatResponseControllerRegistrationResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The response error code." }, { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+", "about": "The response error message, or null if there was no error." } ] } |
The active controller will persist all registrations that are sent in with the correct permissions (CLUSTERACTION on CLUSTER). If the controller is not active, we'll send back a NOT_CONTROLLER error.
The active controller may also return STALE_CONTROLLER_EPOCH if the wrong epoch was passed.
ApiVersionsResponse
The ZkMigrationReady field in ApiVersionsResponse is now deprecated, and won't be filled out.
Code Block |
---|
diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json b/clients/src/main/resources/common/message/ApiVersionsResponse.json index 9fda953e10e..eb449f07c54 100644 --- a/clients/src/main/resources/common/message/ApiVersionsResponse.json +++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json @@ -70,8 +70,6 @@ "about": "The cluster-wide finalized min version level for the feature."} ] }, - { "name": "ZkMigrationReady", "type": "bool", "versions": "3+", "taggedVersions": "3+", - "tag": 3, "ignorable": true, "default": "false", - "about": "Set by a KRaft controller if the required configurations "about": "The response error code.for ZK migration are present" }, + { "name": "ErrorMessageZkMigrationReady", "type": "stringdeprecated", "nullableVersionsversions": "03+", "versionstaggedVersions": "03+", "abouttag": "The response error message, or null if there was no error."3 } ] } |
...
RegisterControllerRecord
The data from the registration request will be written to a new ControllerRegistrationRecord
RegisterControllerRecord.
Code Block |
---|
{ "apiKey": ..., "type": "metadata", "name": "RegisterControllerRecord", "validVersions": "0+", "flexibleVersions": "0+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "about": "The controller id." }, { "name": "IncarnationId", "type": "uuid", "versions": "0+", "about": "The incarnation ID of the controller process" }, { "name": "ZkMigrationReady", "type": "boolean", "versions": "0+", "about": "Set if the required configurations for ZK migration are present." }, { "name": "EndPoints", "type": "[]ControllerEndpoint", "versions": "0+", "about": "The endpoints that can be used to communicate with this controller.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The name of the endpoint." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The hostname." }, { "name": "Port", "type": "uint16", "versions": "0+", "about": "The port." }, { "name": "SecurityProtocol", "type": "int16", "versions": "0+", "about": "The security protocol." } ]}, { "name": "Features", "type": "[]ControllerFeature", "about": "The features on this controller", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The feature name." }, { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", "about": "The minimum supported feature level." }, { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+", "about": "The maximum supported feature level." } ]} ] } |
...
Rather than having a bootstrap.controllers
configuration, we could have a separate configuration like direct.to.controller
and put the controller servers into bootstrap.servers
. Similarly, we could reuse --bootstrap-server
erather than adding --bootstrap-controllerscontroller
.
We decided to go with the scheme proposed above to make it clearer when a tool was going directly to the controller. This also makes it clearer which command-line tools have this capability and which do not.
For example, kafka-console-consumer.sh
does not have the capability to go direct to the controller, since the controller does not handle produces. Therefore, it's intuitive that kafka-console-consumer.sh
lacks the --bootstrap-controllers
flagcontroller
flag.
Another issue is that in the future, we may want to support using the controllers as bootstrap servers for the brokers. The scheme above leaves the door open for this, whereas a scheme that reused existing configurations would not.
...