Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a KRaft broker node shuts down, it is "fenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.

Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, admin cannot list the removed node using the describeQuorum API to unregister it because the describeQuorum response excludes observer nodes that haven't sent a heartbeat within the 5-minute observer session timeoutany of the APIs.

Proposed Changes

This KIP proposes including inactive observers to optionally include fenced brokers in the response to DescribeMetadataQuorumRequestDescribeClusterRequest. A new boolean field,  includeInactiveObserver includeFencedBrokers , will be added to DescribeMetadataQuorumOption. If this option is set to true, AdminClient will read the inactive observers field from the response and include it in the quorum state.

Public Interfaces

DescribeMetadataQuorumRequest

Version of DescribeMetadataQuorum will be bumped to 3 but the request format will not change.

DescribeMetadataQuorumResponse

It is updated with a new field  InactiveObservers which has the same type as the regular Observers field.

...

DescribeClusterOptions. By default, fenced brokers will not be included in the list of broker nodes returned. Also a new boolean field,  isFenced,  will be added to each broker's information in DescribeClusterResponse.

Public Interfaces

DescribeClusterRequest: v2

Code Block
languagejava
{
  "apiKey": 5560,
  "type": "responserequest",
  "namelisteners": ["DescribeQuorumResponsezkBroker",
// Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
 "broker", "controller"],
  "name": "DescribeClusterRequest",
  //
  // Version 21 adds ErrorMessage, Nodes, ErrorMessage in PartitionData, ReplicaDirectoryId in ReplicaState (KIP-853).
EndpointType for KIP-919 support.
  // Version 32 adds InactiveObserversIncludeFencedBrokers in PartitionDatafor (KIP-1073).
 support.
  //
  "validVersions": "0-32",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCodeIncludeClusterAuthorizedOperations", "type": "int16bool", "versions": "0+",
      "about": "TheWhether to topinclude levelcluster errorauthorized codeoperations." },
    { "name": "ErrorMessageEndpointType", "type": "stringint8", "versions": "21+", "nullableVersionsdefault": "2+1",
  "ignorable": true,
   "about": "The error message, or null if there was no error." },
  endpoint type to describe. 1=brokers, 2=controllers." },
     //NEW FIELD
    { "name": "TopicsIncludeFencedBrokers", "type": "[]TopicDatabool",
   "versions": "02+",
      "fieldsabout": "Whether to include fenced brokers when listing brokers." }
  ]
}

DescribeClusterResponse: v2

Code Block
languagejava
{
  "apiKey": 60,
   [
   { "name": "TopicName", "type": "stringresponse",
  "versionsname": "0+DescribeClusterResponse", "entityType": "topicName",
     "about": "The topic name." },
   { "name": "Partitions", "type": "[]PartitionData",
     "versions
  //
  // Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and
  // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
  // Version 2 adds IsFenced field to Brokers for KIP-1073 support.
  //
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "PartitionIndexThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The partition index duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
     { "name": "ErrorMessage", "type": "string", "versions": "20+", "nullableVersions": "20+", "ignorabledefault": true"null",
       "about": "The top-level error message, or null if there was no error." },
     { "name": "LeaderIdEndpointType", "type": "int32int8", "versions": "01+", "entityTypedefault": "brokerId1",
       "about": "The IDendpoint oftype thethat current leader or -1 if the leader is unknown."was described. 1=brokers, 2=controllers." },
     { "name": "LeaderEpochClusterId", "type": "int32string", "versions": "0+",
       "about": "The latest known leader epoch"cluster ID that responding broker belongs to." },
     { "name": "HighWatermarkControllerId", "type": "int64int32", "versions": "0+"},
     { "namedefault": "CurrentVoters-1", "typeentityType": "[]ReplicaStatebrokerId",
      "versionsabout": "0+The ID of the controller broker." },
     { "name": "ObserversBrokers", "type": "[]ReplicaStateDescribeClusterBroker", "versions": "0+",
       "about": "ObserversEach that are actively fetching frombroker in the leaderresponse."},
     // new added InactiveObservers field
"fields": [
        { "name":  "InactiveObserversBrokerId", "type": "[]ReplicaStateint32", "versions": "30+", "defaultmapKey": nulltrue, "entityType": "brokerId",
 
       "about": "ObserversThe thatbroker have not been active for a while"}
ID." },
    ]}
 ]},
 { "name": "NodesHost", "type": "[]Nodestring", "versions": "20+",
        "fieldsabout": [ "The broker hostname." },
      { "name": "NodeIdPort", "type": "int32", "versions": "20+",
     "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated nodebroker port." },
      { "name": "ListenersRack", "type": "[]Listenerstring",
     "aboutversions": "The listeners of this controller0+", "versionsnullableVersions": "20+", "fieldsdefault": ["null",
     { "name": "Name", "typeabout": "string", "versions": "2+", "mapKey": true,
       "about": "The name of the endpoint" },
The rack of the broker, or null if it has not been assigned to a rack." },
      //NEW FIELD
      { "name": "HostIsFenced", "type": "stringbool", "versions": "2+", 
        "about": "The hostnameWhether the broker is fenced" }
    ]},
     { "name": "PortClusterAuthorizedOperations", "type": "uint16int32", "versions": "20+",
  "default": "-2147483648",
      "about": "The port32-bit bitfield to represent authorized operations for this cluster." }
   ]
}

AdminClient API changes

DescribeClusterOptions

Code Block
languagejava
public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {
 ]}
],
"commonStructs": [
 { "name": "ReplicaState", "versions": "0+", "fields": [
   { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
   { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+" },
   { "name": "LogEndOffset", "type": "int64", "versions": "0+",
     "about": "The last known log end offset of the follower or -1 if it is unknown"},
   { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
     "about": "The last known leader wall clock time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
   { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
     "about": "The leader wall clock appends the time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
 ]}
]

DescribeMetadataQuorumOptions

Code Block
languagejava
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

   private boolean includeInactiveObservers;

   public DescribeMetadataQuorumOptions includeInactiveObservers(boolean includeInactiveObservers) {
       this.includeInactiveObservers = includeInactiveObservers;
    private boolean includeAuthorizedOperations;

    private boolean includeFencedBrokers;  //NEW OPTION

    /**
     * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
     * AdminClient should be used.
     *
     */
    // This method is retained to keep binary compatibility with 0.11
    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
        this.timeoutMs = timeoutMs;
        return this;
    }

    public DescribeClusterOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
        this.includeAuthorizedOperations = includeAuthorizedOperations;
        return this;
    }

    public DescribeClusterOptions includeFencedBrokers(boolean includeFencedBrokers) {
        this.includeFencedBrokers = includeFencedBrokers;
        return this;
    }

    /**
     * Specify if authorized operations should be included in the response.  Note that some
     * older brokers cannot not supply this information even if it is requested.
     */
    public boolean includeAuthorizedOperations() {
        return thisincludeAuthorizedOperations;
    }

    /**
     * Specify if inactivefenced observersbrokers should be included in the response.  Note that some
     * older controllerbrokers cannot not supply this information even if it is requested.
     */
    public boolean includeInactiveObserversincludeFencedBrokers() {
        return includeInactiveObserversincludeFencedBrokers;
    }

}

  

kafka-

...

cluster.sh

The console tool used for describing metadata quorum’s status cluster will be updated with a new option, --include-inactive-observers. When used with --status argument for the describe command, it will return inactive observers as  InactiveObservers. When used with --replication argument, it will include the replication state of inactive observers, based on the last known state known by the leadercommand to list brokers and a parameter to include fenced brokers.

Example:

Code Block
languagebash
./bin/kafka-metadata-quorumcluster.sh --bootstrap-controllerserver localhost:90939092 describe --status –include-inactive-observers
ClusterId:list-brokers
ID         HOST     hNWu1PEBT9ONqOc1kOeb7g
LeaderId: PORT       STATE      RACK 1
LeaderEpoch:      
0      15
HighWatermark:    broker-0  9092    130383
MaxFollowerLag:   unfenced      0
MaxFollowerLagTimeMs:   0
CurrentVoters:     
1     [0,1,2]
CurrentObservers:     broker-1  [3,4]
InactiveObservers:9092      [5] unfenced (Broker 5 has shutdown  but  still  registered)


./bin/kafka-metadata-quorumcluster.sh --bootstrap-controllerserver localhost:90939092 describelist-brokers --status
ClusterId:include-fenced-brokers
ID         HOST      hNWu1PEBT9ONqOc1kOeb7g
LeaderId:PORT       STATE      RACK    1
LeaderEpoch:   
0          broker-0  9192  15
HighWatermark:     unfenced     130383
MaxFollowerLag:         0
MaxFollowerLagTimeMs:1     0
CurrentVoters:     broker-1  9092      [0,1,2]
CurrentObservers:  unfenced    
2  [3,4]

AdminClient API

...

         broker-2  9092       fenced   → (Broker 2 has shutdown but still registered)

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

...