You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a KRaft broker node shuts down, it is "fenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.

Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, admin cannot list the removed node using any of the APIs.

Proposed Changes

This KIP proposes to optionally include fenced brokers in the response to DescribeClusterRequest. A new boolean field,  IncludeFencedBrokers , will be added to DescribeClusterRequest. By default, fenced brokers will not be included in the list of broker nodes returned. Also a new boolean field,  Fenced,  will be added to each broker's information in DescribeClusterResponse.

Public Interfaces

DescribeClusterRequest: v2

{
  "apiKey": 60,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeClusterRequest",
  //
  // Version 1 adds EndpointType for KIP-919 support.
  // Version 2 adds IncludeFencedBrokers for KIP-1073 support.
  //
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include cluster authorized operations." },
    { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
      "about": "The endpoint type to describe. 1=brokers, 2=controllers." },
     //NEW FIELD
    { "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+",
      "about": "Whether to include fenced brokers." }
  ]
}

DescribeClusterResponse: v2

{
  "apiKey": 60,
  "type": "response",
  "name": "DescribeClusterResponse",
  //
  // Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and
  // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
  // Version 2 adds Fenced field to Brokers for KIP-1073 support.
  //
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
      "about": "The endpoint type that was described. 1=brokers, 2=controllers." },
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "about": "The cluster ID that responding broker belongs to." },
    { "name": "ControllerId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The ID of the controller broker." },
    { "name": "Brokers", "type": "[]DescribeClusterBroker", "versions": "0+",
      "about": "Each broker in the response.", "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
        "about": "The broker ID." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The broker hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The broker port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the broker, or null if it has not been assigned to a rack." },
      //NEW FIELD
      { "name": "Fenced", "type": "bool", "versions": "2+", 
        "about": "Whether the broker is fenced." }
    ]},
    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
      "about": "32-bit bitfield to represent authorized operations for this cluster." }
  ]
}

AdminClient API changes

A new field, includeFencedBrokers, will be added to DescribeClusterOptions. By default, this option is set to false. When includeFencedBrokers is set to true, any fenced brokers will be included in the DescribeClusterResult returned by Admin clients. 

DescribeClusterOptions

public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {

    private boolean includeAuthorizedOperations;

    private boolean includeFencedBrokers;  //NEW OPTION

    /**
     * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
     * AdminClient should be used.
     *
     */
    // This method is retained to keep binary compatibility with 0.11
    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
        this.timeoutMs = timeoutMs;
        return this;
    }

    public DescribeClusterOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
        this.includeAuthorizedOperations = includeAuthorizedOperations;
        return this;
    }

    public DescribeClusterOptions includeFencedBrokers(boolean includeFencedBrokers) {
        this.includeFencedBrokers = includeFencedBrokers;
        return this;
    }

    /**
     * Specify if authorized operations should be included in the response.  Note that some
     * older brokers cannot not supply this information even if it is requested.
     */
    public boolean includeAuthorizedOperations() {
        return includeAuthorizedOperations;
    }

    /**
     * Specify if fenced brokers should be included in the response.  Note that some
     * older brokers cannot not supply this information even if it is requested.
     */
    public boolean includeFencedBrokers() {
        return includeFencedBrokers;
    }
}
  

kafka-cluster.sh

The console tool used for describing cluster will be updated with a new command to list brokers and a parameter to include fenced brokers.

Example:

./bin/kafka-cluster.sh --bootstrap-server localhost:9092 list-brokers
ID         HOST      PORT       STATE      RACK       
0          broker-0  9092       unfenced              
1          broker-1  9092       unfenced          


./bin/kafka-cluster.sh --bootstrap-server localhost:9092 list-brokers --include-fenced-brokers
ID         HOST      PORT       STATE      RACK       
0          broker-0  9192       unfenced              
1          broker-1  9092       unfenced    
2          broker-2  9092       fenced   → (Broker 2 has shutdown but still registered)

Compatibility, Deprecation, and Migration Plan

  • Existing Admin clients using older versions will not request fenced brokers in DescribeCluster requests and the default is to return unfenced brokers only. This keeps older clients compatible with newer brokers.
  • Newer Admin clients connecting to older brokers will use the older protocol version and hence will not request fenced brokers.

Rejected Alternatives

Returning inactive observer nodes in DescribeMetadataQuorumResponse was considered. However, observers information is only stored in the quorum leader's memory state, therefore this information is lost when there is a leader change. 


  • No labels