Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a KRaft broker node shuts down, it is "unfencedfenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.

Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, admin cannot list the removed node using the describeQuorum API to unregister it because the describeQuorum response excludes observer nodes that haven't sent a heartbeat within the 5-minute observer session timeoutany of the APIs.

Proposed Changes

This KIP proposes including inactive observers to include fenced brokers in the response to DescribeMetadataQuorumRequest. A new field,  includeInactiveObserver , DescribeClusterRequest with version 2 or later.  This means all brokers, fenced and unfenced will be included in the list of broker nodes in the response. Also a new boolean field, "Fenced" will be added to DescribeMetadataQuorumOption. If this option is set to true, AdminClient will read the inactive observers field from the response and include it in the quorum state.

Public Interfaces

DescribeMetadataQuorumRequest

Version of DescribeMetadataQuorum will be bumped to 3 but the request format will not change.

DescribeMetadataQuorumResponse

It is updated with a new field  InactiveObservers which has the same type as the regular Observers field.

...

each broker's information in DescribeClusterResponse.

Currently, if EndpointType in the DescribeClusterRequest is 2 (controllers), a list of registered controller nodes are returned as the DescribeClusterBroker in the result.  The DescribeClusterBroker class will be updated with the new field, "fenced". Therefore when returning controller nodes in the response, this field would just be set to false as the default value since this field is not relevant for controller nodes. 

Public Interfaces

DescribeClusterRequest: v2

Code Block
languagejava
{
  "apiKey": 5560,
  "type": "response",
 "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeQuorumResponseDescribeClusterRequest",
  //
  // Version 1 adds LastFetchTimeStampEndpointType and LastCaughtUpTimestamp in ReplicaState (KIP-836).
for KIP-919 support.
  // Version 2 adds ErrorMessage,an Nodes,additional ErrorMessagefield in PartitionData,the ReplicaDirectoryIdresponse inand ReplicaState (KIP-853).
// Version 3 adds InactiveObservers in PartitionData the request is unchanged (KIP-1073).
"validVersions": "0-3",
  //
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCodeIncludeClusterAuthorizedOperations", "type": "int16bool", "versions": "0+",
      "about": "TheWhether to topinclude levelcluster errorauthorized codeoperations." },
    { "name": "ErrorMessageEndpointType", "type": "stringint8", "versions": "21+", "nullableVersionsdefault": "2+1",
 "ignorable": true,
     "about": "The errorendpoint message,type orto null if there was no errordescribe. 1=brokers, 2=controllers." },
  ]
}

DescribeClusterResponse: v2

Code Block
languagejava
{
  "nameapiKey": "Topics"60,
  "type": "[]TopicDataresponse",
   "versionsname": "0+DescribeClusterResponse",
 "fields": [ //
  // { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
     "about": "The topic name." },
   { "name": "Partitions", "type": "[]PartitionData",
     "versionsVersion 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and
  // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
  // Version 2 adds Fenced field to Brokers for KIP-1073 support.
  //
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "PartitionIndexThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The partition index." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+" duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorMessageErrorCode", "type": "stringint16", "versions": "20+", "nullableVersions": "2+", "ignorable": true,
       "about": "The top-level error messagecode, or null0 if there was no error." },
     { "name": "LeaderIdErrorMessage", "type": "int32string", "versions": "0+", "entityTypenullableVersions": "brokerId0+",
       "aboutdefault": "The ID of the current leader or -1 if the leader is unknown."}null",
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latesttop-level known leader epoch"},
     { "name": "HighWatermark", "type": "int64", "versions": "0+"},
 error message, or null if there was no error." },
    { "name": "CurrentVotersEndpointType", "type": "[]ReplicaStateint8", "versions": "01+" },
     { "name"default": "Observers", "type": "[]ReplicaState", "versions": "0+",
 1",
      "about": "ObserversThe thatendpoint aretype activelythat fetchingwas from the leader"described. 1=brokers, 2=controllers." },
     { "name":  "InactiveObserversClusterId", "type": "[]ReplicaStatestring", "versions": "30+", "default": null,
       "about": "ObserversThe thatcluster haveID notthat beenresponding activebroker forbelongs ato." while"},
   ]}
 ]},
 { "name": "NodesControllerId", "type": "[]Nodeint32", "versions": "20+", "fields": [
   { "name"default": "NodeId-1", "typeentityType": "int32brokerId", "versions": "2+",
     "mapKey": true, "entityType": "brokerId", "about": "The ID of the associatedcontroller nodebroker." },
    { "name": "ListenersBrokers", "type": "[]ListenerDescribeClusterBroker", "versions": "0+",
      "about": "TheEach listenersbroker ofin thisthe controllerresponse.", "versions": "2+", "fields": [
      { "name": "NameBrokerId", "type": "stringint32", "versions": "20+", "mapKey": true, "entityType": "brokerId",
        "about": "The name of the endpointbroker ID." },
      { "name": "Host", "type": "string", "versions": "20+",
        "about": "The broker hostname." },
      { "name": "Port", "type": "uint16int32", "versions": "20+",
        "about": "The broker port." }
   ]}
 ]}
],
"commonStructs": [
 { "name": "ReplicaState", "versions": "0+", "fields": [
   { "name": "ReplicaIdRack", "type": "int32string", "versions": "0+", "entityTypenullableVersions": "brokerId0+" },
   { "namedefault": "ReplicaDirectoryIdnull",
 "type": "uuid", "versions": "2+" },
   { "nameabout": "LogEndOffset", "type": "int64", "versions": "0+",
     "about": "The last known log end offset of the follower or -1 if it is unknown"},
The rack of the broker, or null if it has not been assigned to a rack." },
      // NEW FIELD
      { "name": "LastFetchTimestampFenced", "type": "int64bool", "versions": "12+", "ignorable": true, "default": -1,

        "about": "The last known leader wall clock time when a follower fetched fromWhether the leader.broker This is reported as -1 both for the current leader or if it is unknown for a voter"fenced." }
    ]},
    { "name": "LastCaughtUpTimestampClusterAuthorizedOperations", "type": "int64int32", "versions": "10+", "ignorabledefault": true, "default": -1-2147483648",
      "about": "The32-bit leaderbitfield wallto clockrepresent appends the time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
 ]}
]

DescribeMetadataQuorumOptions

Code Block
languagejava
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

   private boolean includeInactiveObservers;

   public DescribeMetadataQuorumOptions includeInactiveObservers(boolean includeInactiveObservers) {
       this.includeInactiveObservers = includeInactiveObservers;
       return this;
   }

   /**
    * Specify if inactive observers should be included in the response.  Note that some
    * older controller cannot not supply this information even if it is requested.
    */
   public boolean includeInactiveObservers() {
       return includeInactiveObservers;
   }

}

...

authorized operations for this cluster." }
  ]
}

AdminClient

There will not be any significant change for AdminClient when describing a cluster. However the Node class used for reading the DescribeClusterResponse data will be updated with a new field, "fenced". If the response from the broker did not include fenced brokers and the new "fenced" field in broker description, the field in the Node class will be set to false as the default. 

kafka-cluster.sh

The console tool used for describing metadata quorum’s status cluster will be updated with a new option, --include-inactive-observers. When used with --status argument for the describe command, it will return inactive observers as  InactiveObservers.command to list nodes. When it is used with --bootstrap-server, the output will include STATE column to describe whether a broker is fenced. When it is used with --bootstrap-controller, the output will not include the STATE column as this is not relevant for controller nodes. Also if there is no rack information for any of the nodes, the RACK column will be omitted from the output. 

Example:

Code Block
languagebash
./bin/kafka-metadata-quorumcluster.sh --bootstrap-controllerserver localhost:9093 describe --status –include-inactive-observers
ClusterId:9092 list-nodes
ID         HOST      PORT   hNWu1PEBT9ONqOc1kOeb7g
LeaderId:    RACK      STATE       1
LeaderEpoch:0          broker-0  9092 15
HighWatermark:      rack-a	  unfenced  130383
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:1          [0,1,2]
CurrentObservers:       [3,4]
InactiveObservers:broker-1  9092       rack-b    unfenced
2          broker-2  9092        [5] → (Broker 5 has shutdown but still registered)

rack-c    fenced             

./bin/kafka-metadata-quorumcluster.sh --bootstrap-controller localhost:9093 describe list--statusnodes
ClusterId:ID         HOST     hNWu1PEBT9ONqOc1kOeb7g
LeaderId: 	   PORT       
0       1
LeaderEpoch:    controller-3    9093       15
HighWatermark:      
1       130383
MaxFollowerLag:    controller-4    9093    0
MaxFollowerLagTimeMs:   0
CurrentVoters:2          controller-5     [0,1,2]
CurrentObservers:9093       [3,4]

AdminClient API

Client's QuorumInfo will be updated with a new field for inactive observers. The existing createQuorumResult  method in AdminClient will be updated to populate this new field if the includeInactiveObserver option is set to true. If the inactiveObservers  field returned by the controller is null  or if includeInactiveObserver  option is not set to true, the new field in QuorumInfo will be set to null.

Compatibility, Deprecation, and Migration Plan

  • Older versions of AdminClient will use the older version of the RPC, therefore will not see the field.
  • When a newer AdminClient connects to older controllers, it will use the older protocol version and will not receive the field for inactive observers. If includeInactiveObserver  is set to true when communicating with older controllers, the field value set to null.

Rejected Alternatives

  • Existing Admin clients using older versions will only receive the unfenced brokers since they will be using the older protocol version.
  • Newer Admin clients connecting to older brokers will not receive the fenced brokers in the response. The new field in Node class, "fenced" will still be populated when reading the response data and will be set to false for the brokers returned.

Rejected Alternatives

Returning inactive observer nodes in DescribeMetadataQuorumResponse was considered. However, observers information is only stored in the quorum leader's memory state, therefore this information is lost when there is a leader change. Adding a new field to DescribeClusterResponse to list registered but inactive observer nodes was considered. However, this information is typically an administrator requirement, not of interest to regular users. The main goal is to allow administrators to retrieve broker IDs for unregistration, whereas the DescribeCluster API is more suitable for regular users.