You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a KRaft broker node shuts down, it is "unfenced", but still registered in the controller. To completely remove KRaft-based broker nodes, they must first be unregistered via the Kafka Admin API.

Removing a node without unregistering causes various issues, such as a newly created partition will still get assigned to the removed replicas, or preventing metadata version updates after an upgrade. If this happens, the cluster admin/operator needs to get the node id to unregister it. However, we cannot list the removed node using the describeQuorum API to unregister it because the describeQuorum response excludes observer nodes that haven't sent a heartbeat within the 5-minute observer session timeout.

Proposed Changes

This KIP proposes including inactive observers in the response to DescribeMetadataQuorumRequest. A new field,  includeInactiveObserver , will be added to DescribeMetadataQuorumOption. If this option is set to true, AdminClient will read the inactive observers field from the response and include it in the quorum state.

Public Interfaces

DescribeMetadataQuorumRequest

Version of DescribeMetadataQuorum will be bumped to 3 but the request format will not change.

DescribeMetadataQuorumResponse

It is updated with a new field  InactiveObservers which has the same type as the regular Observers field.

Example:


"apiKey": 55,
"type": "response",
"name": "DescribeQuorumResponse",
// Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
// Version 2 adds ErrorMessage, Nodes, ErrorMessage in PartitionData, ReplicaDirectoryId in ReplicaState (KIP-853).
// Version 3 adds InactiveObservers in PartitionData (KIP-1073).
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
 { "name": "ErrorCode", "type": "int16", "versions": "0+",
   "about": "The top level error code."},
 { "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true,
   "about": "The error message, or null if there was no error." },
 { "name": "Topics", "type": "[]TopicData",
   "versions": "0+", "fields": [
   { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
     "about": "The topic name." },
   { "name": "Partitions", "type": "[]PartitionData",
     "versions": "0+", "fields": [
     { "name": "PartitionIndex", "type": "int32", "versions": "0+",
       "about": "The partition index." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+"},
     { "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true,
       "about": "The error message, or null if there was no error." },
     { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"},
     { "name": "HighWatermark", "type": "int64", "versions": "0+"},
     { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
     { "name": "Observers", "type": "[]ReplicaState", "versions": "0+",
       "about": "Observers that are actively fetching from the leader"},
     { "name":  "InactiveObservers", "type": "[]ReplicaState", "versions": "3+", "default": null,
       "about": "Observers that have not been active for a while"}
   ]}
 ]},
 { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
   { "name": "NodeId", "type": "int32", "versions": "2+",
     "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
   { "name": "Listeners", "type": "[]Listener",
     "about": "The listeners of this controller", "versions": "2+", "fields": [
     { "name": "Name", "type": "string", "versions": "2+", "mapKey": true,
       "about": "The name of the endpoint" },
     { "name": "Host", "type": "string", "versions": "2+",
       "about": "The hostname" },
     { "name": "Port", "type": "uint16", "versions": "2+",
       "about": "The port" }
   ]}
 ]}
],
"commonStructs": [
 { "name": "ReplicaState", "versions": "0+", "fields": [
   { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
   { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+" },
   { "name": "LogEndOffset", "type": "int64", "versions": "0+",
     "about": "The last known log end offset of the follower or -1 if it is unknown"},
   { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
     "about": "The last known leader wall clock time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
   { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
     "about": "The leader wall clock appends the time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
 ]}
]

DescribeMetadataQuorumOptions

public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

   private boolean includeInactiveObservers;

   public DescribeMetadataQuorumOptions includeInactiveObservers(boolean includeInactiveObservers) {
       this.includeInactiveObservers = includeInactiveObservers;
       return this;
   }

   /**
    * Specify if inactive observers should be included in the response.  Note that some
    * older controller cannot not supply this information even if it is requested.
    */
   public boolean includeInactiveObservers() {
       return includeInactiveObservers;
   }

}


kafka-metadata-quorum.sh

The console tool used for describing metadata quorum’s status will be updated with a new option, --include-inactive-observers. When used with --status argument for the describe command, it will return inactive observers as  InactiveObservers.


Example:

/bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status –include-inactive-observers
ClusterId:              hNWu1PEBT9ONqOc1kOeb7g
LeaderId:               1
LeaderEpoch:            15
HighWatermark:          130383
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [0,1,2]
CurrentObservers:       [3,4]
InactiveObservers:      [5] → (Broker 5 has shutdown but still registered)

/bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9093 describe --status
ClusterId:              hNWu1PEBT9ONqOc1kOeb7g
LeaderId:               1
LeaderEpoch:            15
HighWatermark:          130383
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [0,1,2]
CurrentObservers:       [3,4]

AdminClient API

Client's QuorumInfo will be updated with a new field for inactive observers. The existing createQuorumResult  method in AdminClient will be updated to populate this new field if the includeInactiveObserver option is set to true. If the inactiveObservers  field returned by the controller is null  or if includeInactiveObserver  option is not set to true, the new field in QuorumInfo will be set to null.

Compatibility, Deprecation, and Migration Plan

  • Older versions of AdminClient will not include inactive observers in the QuorumInfo. Consequently, the new field for inactive observers returned by newer controllers will be ignored.
  • When a newer AdminClient connects to older controllers, it will use the older protocol version and will not receive the field for inactive observers. If includeInactiveObserver  is set to true when communicating with older controllers, the field value set to null.

Rejected Alternatives

Adding a new field to DescribeClusterResponse to list registered but inactive observer nodes was considered. However, this information is typically an administrator requirement, not of interest to regular users. The main goal is to allow administrators to retrieve broker IDs for unregistration, whereas the DescribeCluster API is more suitable for regular users.

  • No labels