DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Draft
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In a Raft-based quorum system, cluster membership changes (adding or removing voters) are applied through the log replication process. A membership change becomes effective only after it has been committed — that is, replicated to a majority of the current configuration. Between the moment a membership change entry is appended and the moment it is committed, the cluster exists in a transitional state where the leader’s in-memory voter list (voterSet) may include uncommitted changes.
By maintaining a separate committedVoterSet, the system ensures:
Safety: Both committed and uncommitted voters participate in quorum decisions and elections. However, uncommitted voters (those whose VotersRecord has been appended but not yet committed) carry a risk: if the leader crashes before the VotersRecord is committed, the voter change maybe lost due to log truncation, leading to potential configuration inconsistencies.
Stability: When the committed and uncommitted voter sets differ, the cluster is in a joint consensus phase.
Exposing this state allows management and orchestration systems to recognize that the controller quorum is undergoing a transitional reconfiguration, and to avoid assuming the new configuration is already stable.Debugging and Compliance Verification: During troubleshooting or post-incident audits, operators and tools must be able to differentiate between committed voters and uncommitted voters that have started replicating but are not yet officially part of the quorum. This distinction is critical for confirming whether a reconfiguration completed successfully and whether the cluster’s control plane reached a consistent state.
Public Interfaces
Request Schema
{
"apiKey": 55,
"type": "request",
"listeners": ["broker", "controller"],
"name": "DescribeQuorumRequest",
// Version 1 adds additional fields in the response. The request is unchanged (KIP-836).
// Version 2 adds additional fields in the response. The request is unchanged (KIP-853).
+ // Version 3 adds additional fields in the response. The request is unchanged.
+ "validVersions": "0-3",
"flexibleVersions": "0+",
"latestVersionUnstable": false,
"fields": [
{ "name": "Topics", "type": "[]TopicData", "versions": "0+",
"about": "The topics to describe.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The partitions to describe.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." }
]
}]
}
]
}
Response Schema
{
"apiKey": 55,
"type": "response",
"name": "DescribeQuorumResponse",
// Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836).
// Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData, ReplicaDirectoryId in ReplicaState (KIP-853).
+ // Version 3 adds CommittedVoters in PartitionData
+ "validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level error code."},
{ "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true,
"about": "The error message, or null if there was no error." },
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+"},
{ "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true,
"about": "The error message, or null if there was no error." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch"},
{ "name": "HighWatermark", "type": "int64", "versions": "0+"},
{ "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
+ { "name": "CommittedVoters", "type": "[]ReplicaState", "versions": "3+", "ignorable": true , "about": "The voters has been committed."},
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
]}
]},
{ "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "2+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node" },
{ "name": "Listeners", "type": "[]Listener",
"about": "The listeners of this controller", "versions": "2+", "fields": [
{ "name": "Name", "type": "string", "versions": "2+", "mapKey": true,
"about": "The name of the endpoint" },
{ "name": "Host", "type": "string", "versions": "2+",
"about": "The hostname" },
{ "name": "Port", "type": "uint16", "versions": "2+",
"about": "The port" }
]}
]}
],
"commonStructs": [
{ "name": "ReplicaState", "versions": "0+", "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
{ "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+" },
{ "name": "LogEndOffset", "type": "int64", "versions": "0+",
"about": "The last known log end offset of the follower or -1 if it is unknown"},
{ "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
"about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
{ "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
"about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
]}
]
}
We also add new field committedVoters to QuorumInfo.
public class QuorumInfo {
private final int leaderId;
private final long leaderEpoch;
private final long highWatermark;
private final List<ReplicaState> voters;
+ private final List<ReplicaState> committedVoters;
private final List<ReplicaState> observers;
private final Map<Integer, Node> nodes;
QuorumInfo(
int leaderId,
long leaderEpoch,
long highWatermark,
List<ReplicaState> voters,
List<ReplicaState> committedVoters,
List<ReplicaState> observers,
Map<Integer, Node> nodes
) {
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.highWatermark = highWatermark;
this.voters = voters;
+ this.committedVoters = committedVoters;
this.observers = observers;
this.nodes = nodes;
}}
Proposed Changes
Raft State Tracking
This proposal introduces a mechanism for explicit tracking the committed voter set within LeaderState. The objective is to provide an accurate representation of the last committed quorum membership.
The leaderState is extended to reference the KRaftControlRecordStateMachine, from which derives the voter set history. Using this information the of high watermark from LeaderState, the committed voter set can be obtained directly from KRaftControlRecordStateMachine. Both the voter set in the KRaftControlRecordStateMachine and the high watermark are maintained entirely in memory. This eliminates disk I/O and minimizing performance impact.
Committed voter present in current voters/observers
If a committed voter is also present in the current voters or observers set, its ReplicaState is identical to the corresponding entry in those sets. This ensures that all runtime fields within ReplicaState are available and reflect the current replica state.
Committed voter absent from voters or observers
If a committed voter is not found in either voters or observers, certain runtime-specific fields cannot be derived, since the replica is no longer actively participating in log replication. To maintain schema consistency, these fields in its ReplicaState are set to sentinel values:
hasAcknowledgedLeader = false
endLogOffset = -1lastFetchTimestamp = -1lastCaughtUpTimestamp = -1
Above fields are only meaningful for active replicas(voter) in the quorum. For committed voters, they are not strictly required—the sentinel values simply indicate that the data is unavailable or not applicable.
High watermark is unknown when a new leader is elected
- In such case, the empty set should be returned because we don't know the high watermark.
- Committed voters can be returned as non-empty when leader update the high watermark.
Compatibility, Deprecation, and Migration Plan
- Bump
DescribeQuorumRequest andDescribeQuorumResponseto a newer version, with backward compatibility preserved for the old protocol version.
Test Plan
- Unit test and integration test will be added.
Rejected Alternatives
- A new field,
state, can be introduced within theReplicaStatestructure to explicitly track the status of the voter like uncommitted and committed.