Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a KRaft broker node shuts down, it is "unfencedfenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.

Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, we admin cannot list the removed node using the describeQuorum API to unregister it because the describeQuorum response excludes observer nodes that haven't sent a heartbeat within the 5-minute observer session timeoutany of the APIs.

Proposed Changes

This KIP proposes including inactive observers to include fenced brokers in the response to DescribeMetadataQuorumRequest. A new field,  includeInactiveObserver , DescribeClusterRequest with version 2 or later.  This means all brokers, fenced and unfenced will be included in the list of broker nodes in the response. Also a new boolean field, "Fenced" will be added to DescribeMetadataQuorumOption. If this option is set to true, AdminClient will read the inactive observers field from the response and include it in the quorum state.

Public Interfaces

DescribeMetadataQuorumRequest

Version of DescribeMetadataQuorum will be bumped to 3 but the request format will not change.

DescribeMetadataQuorumResponse

It is updated with a new field  InactiveObservers which has the same type as the regular Observers field.

...

each broker's information in DescribeClusterResponse.

Currently, if EndpointType in the DescribeClusterRequest is 2 (controllers), a list of registered controller nodes are returned as the DescribeClusterBroker in the result.  The DescribeClusterBroker class will be updated with the new field, "fenced". Therefore when returning controller nodes in the response, this field would just be set to false as the default value since this field is not relevant for controller nodes. 

Public Interfaces

DescribeClusterRequest: v2

Code Block
languagejava
{
  "apiKey": 5560,
  "type": "request",
  "listeners": "response",
["zkBroker", "broker", "controller"],
  "name": "DescribeQuorumResponseDescribeClusterRequest",
  //
  // Version 1 adds LastFetchTimeStampEndpointType and LastCaughtUpTimestamp in ReplicaState (KIP-836).
for KIP-919 support.
  // Version 2 adds ErrorMessage,an Nodes,additional ErrorMessagefield in PartitionData,the ReplicaDirectoryIdresponse inand ReplicaState (KIP-853).
// Version 3 adds InactiveObservers in PartitionData the request is unchanged (KIP-1073).
"validVersions": "0-3",
  //
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCodeIncludeClusterAuthorizedOperations", "type": "int16bool", "versions": "0+",
      "about": "TheWhether to topinclude levelcluster errorauthorized codeoperations." },
    { "name": "ErrorMessageEndpointType", "type": "stringint8", "versions": "21+", "nullableVersionsdefault": "2+1",
 "ignorable": true,
     "about": "The errorendpoint message,type orto null if there was no errordescribe. 1=brokers, 2=controllers." },
  ]
}

DescribeClusterResponse: v2

Code Block
languagejava
{
  "nameapiKey": "Topics"60,
  "type": "[]TopicDataresponse",
   "versionsname": "0+DescribeClusterResponse",
 "fields": [ //
  // { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
     "about": "The topic name." },
   { "name": "Partitions", "type": "[]PartitionData",
     "versionsVersion 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and
  // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
  // Version 2 adds Fenced field to Brokers for KIP-1073 support.
  //
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "PartitionIndexThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The partition index." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+" duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorMessageErrorCode", "type": "stringint16", "versions": "20+", "nullableVersions": "2+", "ignorable": true,
       "about": "The top-level error messagecode, or null0 if there was no error." },
     { "name": "LeaderIdErrorMessage", "type": "int32string", "versions": "0+", "entityTypenullableVersions": "brokerId",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
 0+", "default": "null",
      "about": "The latesttop-level known leader epoch"},
     { "name": "HighWatermark", "type": "int64", "versions": "0+"error message, or null if there was no error." },
     { "name": "CurrentVotersEndpointType", "type": "[]ReplicaStateint8", "versions": "01+" },
     { "namedefault": "Observers1", "type": "[]ReplicaState", "versions": "0+",
       "about": "ObserversThe thatendpoint aretype activelythat fetchingwas from the leader"described. 1=brokers, 2=controllers." },
     { "name":  "InactiveObserversClusterId", "type": "[]ReplicaStatestring", "versions": "30+", "default": null,
       "about": "ObserversThe thatcluster haveID notthat beenresponding activebroker forbelongs ato." while"},
   ]}
 ]},
 { "name": "NodesControllerId", "type": "[]Nodeint32", "versions": "20+", "fieldsdefault": [
   { "name": "NodeId""-1", "typeentityType": "int32brokerId", "versions": "2+",
     "mapKey": true, "entityType": "brokerId", "about": "The ID of the associatedcontroller nodebroker." },
    { "name": "ListenersBrokers", "type": "[]ListenerDescribeClusterBroker", "versions": "0+",
      "about": "TheEach listenersbroker ofin thisthe controllerresponse.", "versions": "2+", "fields": [
      { "name": "NameBrokerId", "type": "stringint32", "versions": "20+", "mapKey": true, "entityType": "brokerId",
        "about": "The name of the endpointbroker ID." },
      { "name": "Host", "type": "string", "versions": "20+",
        "about": "The broker hostname." },
      { "name": "Port", "type": "uint16int32", "versions": "20+",
        "about": "The broker port" }
   ]}
 ]}
],
"commonStructs": [
 { "name": "ReplicaState", "versions": "0+", "fields": [
." },
      { "name": "ReplicaIdRack", "type": "int32string", "versions": "0+", "entityTypenullableVersions": "brokerId0+" },
   { "namedefault": "ReplicaDirectoryIdnull",
 "type": "uuid", "versions": "2+" },
   { "nameabout": "LogEndOffset", "type": "int64", "versions": "0+",
     "about": "The last known log end offset of the follower or -1 if it is unknown"},
The rack of the broker, or null if it has not been assigned to a rack." },
      // NEW FIELD
      { "name": "LastFetchTimestampFenced", "type": "int64bool", "versions": "12+", "ignorable": true, "default": -1,

        "about": "The last known leader wall clock time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
Whether the broker is fenced." }
    ]},
    { "name": "LastCaughtUpTimestampClusterAuthorizedOperations", "type": "int64int32", "versions": "10+", "ignorabledefault": true, "default": -1-2147483648",
      "about": "The32-bit leaderbitfield wallto clockrepresent appendsauthorized the time of the offset operations for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
 ]}
]

DescribeMetadataQuorumOptions

Code Block
languagejava
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

   private boolean includeInactiveObservers;

   public DescribeMetadataQuorumOptions includeInactiveObservers(boolean includeInactiveObservers) {
       this.includeInactiveObservers = includeInactiveObservers;
       return this;
   }

   /**
    * Specify if inactive observers should be included in the response.  Note that some
    * older controller cannot not supply this information even if it is requested.
    */
   public boolean includeInactiveObservers() {
       return includeInactiveObservers;
   }

}

...

this cluster." }
  ]
}

AdminClient

There will not be any significant change for AdminClient when describing a cluster. However the Node class used for reading the DescribeClusterResponse data will be updated with a new field, "fenced". If the response from the broker did not include fenced brokers and the new "fenced" field in broker description, the field in the Node class will be set to false as the default. 

kafka-cluster.sh

The console tool used for describing metadata quorum’s status cluster will be updated with a new option, --include-inactive-observers. When used with --status argument for the describe command, it will return inactive observers as  InactiveObservers.command to list nodes. When it is used with --bootstrap-server, the output will include STATE column to describe whether a broker is fenced. When it is used with --bootstrap-controller, the output will not include the STATE column as this is not relevant for controller nodes. Also if there is no rack information for any of the nodes, the RACK column will be omitted from the output. 

Example:

Code Block
languagebash
./bin/kafka-metadata-quorumcluster.sh --bootstrap-controllerserver localhost:9093 describe --status –include-inactive-observers
ClusterId:9092 list-nodes
ID         HOST      PORT     hNWu1PEBT9ONqOc1kOeb7g
LeaderId:  RACK      STATE       1
LeaderEpoch:0          broker-0  15
HighWatermark:9092       rack-a	  unfenced  130383
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:1          [0,1,2]
CurrentObservers:       [3,4]
InactiveObservers:      [5] → (Broker 5 has shutdown but still registered)

/bin/kafka-metadata-quorumbroker-1  9092       rack-b    unfenced
2          broker-2  9092       rack-c    fenced             

./bin/kafka-cluster.sh --bootstrap-controller localhost:9093 describe list--statusnodes
ClusterId:ID         HOST     hNWu1PEBT9ONqOc1kOeb7g
LeaderId: 	   PORT       
0       1
LeaderEpoch:    controller-3    9093       15
HighWatermark:      
1       130383
MaxFollowerLag:    controller-4    9093    0
MaxFollowerLagTimeMs:   0
CurrentVoters:2          controller-5     [0,1,2]
CurrentObservers:9093       [3,4]

AdminClient API

...

Compatibility, Deprecation, and Migration Plan

  • Older versions of AdminClient will not include inactive observers in the QuorumInfo. Consequently, the new field for inactive observers returned by newer controllers will be ignored.
  • When a newer AdminClient connects to older controllers, it will use the older protocol version and will not receive the field for inactive observers. If includeInactiveObserver  is set to true when communicating with older controllers, the field value set to null.

Rejected Alternatives

  • Existing Admin clients using older versions will only receive the unfenced brokers since they will be using the older protocol version.
  • Newer Admin clients connecting to older brokers will not receive the fenced brokers in the response. The new field in Node class, "fenced" will still be populated when reading the response data and will be set to false for the brokers returned.

Rejected Alternatives

Returning inactive observer nodes in DescribeMetadataQuorumResponse was considered. However, observers information is only stored in the quorum leader's memory state, therefore this information is lost when there is a leader change. Adding a new field to DescribeClusterResponse to list registered but inactive observer nodes was considered. However, this information is typically an administrator requirement, not of interest to regular users. The main goal is to allow administrators to retrieve broker IDs for unregistration, whereas the DescribeCluster API is more suitable for regular users.