Status
Current state: "Accepted"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When a KRaft broker node shuts down, it is "fenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.
Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, admin cannot list the removed node using any of the APIs.
Proposed Changes
This KIP proposes to include fenced brokers in the response to DescribeClusterRequest with version 2 or later. A new field, includeFencedBrokers
, will be added to DescribeClusterOptions. If this option is set to true, fenced brokers will be included in the response. Also a new boolean field, "Fenced" will be added to each broker's information in DescribeClusterResponse.
Currently, if EndpointType in the DescribeClusterRequest is 2 (controllers), a list of registered controller nodes are returned as the DescribeClusterBroker in the result. The DescribeClusterBroker class will be updated with the new field, "fenced". Therefore when returning controller nodes in the response, this field would just be set to false as the default value since this field is not relevant for controller nodes.
Public Interfaces
DescribeClusterRequest: v2
{ "apiKey": 60, "type": "request", "listeners": ["zkBroker", "broker", "controller"], "name": "DescribeClusterRequest", // // Version 1 adds EndpointType for KIP-919 support. // Version 2 adds IncludeFencedBrokers for KIP-1073 support. // "flexibleVersions": "0+", "fields": [ { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include cluster authorized operations." }, { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1", "about": "The endpoint type to describe. 1=brokers, 2=controllers." } //NEW FIELD { "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+", "about": "Whether to include fenced brokers when listing brokers." } ] }
DescribeClusterResponse: v2
{ "apiKey": 60, "type": "response", "name": "DescribeClusterResponse", // // Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes. // Version 2 adds Fenced field to Brokers for KIP-1073 support. // "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1", "about": "The endpoint type that was described. 1=brokers, 2=controllers." }, { "name": "ClusterId", "type": "string", "versions": "0+", "about": "The cluster ID that responding broker belongs to." }, { "name": "ControllerId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId", "about": "The ID of the controller broker." }, { "name": "Brokers", "type": "[]DescribeClusterBroker", "versions": "0+", "about": "Each broker in the response.", "fields": [ { "name": "BrokerId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The broker ID." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The broker hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The broker port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the broker, or null if it has not been assigned to a rack." }, // NEW FIELD { "name": "Fenced", "type": "bool", "versions": "2+", "about": "Whether the broker is fenced." } ]}, { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } ] }
AdminClient
The DescribeClusterOptions class will be updated with a new boolean option to request fenced brokers.
The Node class used for reading the DescribeClusterResponse data will be updated with a new field, "fenced". If the response from the broker did not include fenced brokers and the new field will be set to false by default.
DescribeClusterOptions:
public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> { private boolean includeAuthorizedOperations; private boolean includeFencedBrokers; /** * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 public DescribeClusterOptions timeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; return this; } public DescribeClusterOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) { this.includeAuthorizedOperations = includeAuthorizedOperations; return this; } public DescribeClusterOptions includeFencedBrokers(boolean includeFencedBrokers) { this.includeFencedBrokers = includeFencedBrokers; return this; } /** * Specify if authorized operations should be included in the response. Note that some * older brokers cannot not supply this information even if it is requested. */ public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } /** * Specify if fenced brokers should be included in the response. Note that some * older brokers cannot not supply this information even if it is requested. */ public boolean includeFencedBrokers() { return includeFencedBrokers; } }
kafka-cluster.sh
The console tool used for describing cluster will be updated with a new command to list nodes' endpoints returned by the describeCluster API. The new command will have an optional argument to include the fenced brokers. When the new command is used with --bootstrap-server and the new argument, --include-fenced-brokers
, the output will include STATE column to describe if nodes are fenced or unfenced. When it is used with --bootstrap-controller, the output will not include the STATE column as this is not relevant for controller nodes. The new command can be used with --bootstrap-controller to list controller node endpoints, however the new argument to include fenced brokers will not be a valid argument.
The tool is also updated to output the endpoint types, depending on which bootstrap option is used. This will make it clearer to users that they only get either controller or broker type of endpoints but not both, which is the current design.
Example:
./bin/kafka-cluster.sh --bootstrap-server localhost:9092 list-endpoints --include-fenced-brokers ID HOST PORT RACK STATE ENDPOINT_TYPE 0 broker-0 9092 rack-a unfenced broker 1 broker-1 9092 rack-b unfenced broker 2 broker-2 9092 rack-c fenced broker ./bin/kafka-cluster.sh --bootstrap-server localhost:9092 list-endpoints ID HOST PORT RACK STATE ENDPOINT_TYPE 0 broker-0 9092 rack-a unfenced broker 1 broker-1 9092 rack-b unfenced broker ./bin/kafka-cluster.sh --bootstrap-controller localhost:9093 list-endpoints ID HOST PORT RACK ENDPOINT_TYPE 0 controller-3 9093 controller 1 controller-4 9093 controller 2 controller-5 9093 controller ./bin/kafka-cluster.sh --bootstrap-controller localhost:9093 list-endpoints --include-fenced-brokers usage: kafka-cluster [-h] {cluster-id,unregister,list-endpoints} ... kafka-cluster: error: unrecognized arguments: '--bootstrap-controller'
Compatibility, Deprecation, and Migration Plan
- Existing Admin clients using older versions will continue receiving only unfenced brokers since they will be using the older protocol version and the new option,
includeFencedBrokers
will not be available. - Newer Admin clients talking to older brokers will not receive the fenced brokers in the response. The new field in Node class, "fenced" will still be populated when reading the response data and will be set to false for the brokers returned. If I
ncludeFencedBrokers
is set in the request, anUnsupportedVersionException
will be thrown. - If kafka_cluster.sh tool is used against older brokers and
--include-fenced-brokers
flag is set, it will print out theUnsupportedVersionException
. - If Admin client is initialised with BOOTSTRAP_CONTROLLER and
includeFencedBrokers
is set true when describing a cluster, the admin client will throwIllegalArgumentException.
Rejected Alternatives
- Returning inactive observer nodes in DescribeMetadataQuorumResponse was considered. However, observers information is only stored in the quorum leader's memory state, therefore this information is lost when there is a leader change.