Status

Current state: Implementing

Discussion thread: https://lists.apache.org/thread/pqj9f1r3rk83oqtxxtg6y5h7m7cf56r2

JIRA: KAFKA-16164 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP will go over scenarios where we might expect disruptive servers and discuss how Pre-Vote (as originally detailed in the extended Raft paper and in KIP-650) can ensure correctness when it comes to network partitions.  

Pre-Vote is the idea of “canvassing” the cluster to check if it would receive a majority of votes - if yes it increases its epoch and sends a disruptive vote request. If not, it does not increase its epoch and does not send a vote request. 

Throughout this KIP, we will differentiate between Pre-Vote and the original Vote request behavior with "Pre-Vote" and "standard Vote".

Disruptive server scenarios

When a follower becomes partitioned from the rest of the quorum, it will continuously increase its epoch to start elections until it is able to regain connection to the leader/rest of the quorum. When the server regains connectivity, it will disturb the rest of the quorum as they will be forced to participate in an unnecessary election. While this situation only results in one leader stepping down, as we start supporting larger quorums these events may occur more frequently per quorum.

For instance, here's a great example from https://decentralizedthoughts.github.io/2020-12-12-raft-liveness-full-omission/ which demonstrates a scenario where we could have flip-flopping leadership changes.

Let's say server 3 is the current leader. Server 4 will eventually start an election because it is unable to find the leader, causing Server 2 to transition to Unattached. Server 4 will not be able to receive enough votes to become leader, but Server 2 will be able to once its election timer expires. As Server 5 is also unable to communicate to Server 2, this kicks off a back and forth of leadership transition between Servers 2 and 3.


While network partitions may be the main issue we expect to encounter/mitigate impact for, it’s possible that bugs and user error create similar effects to network partitions that we need to guard against. For instance, a bug which causes fetch requests to periodically timeout or setting controller.quorum.fetch.timeout.ms and other related configs too low.

Public Interfaces

We will add a new field PreVote to VoteRequests and VoteResponses to signal whether the requests and responses are for Pre-Votes. The server does not increase its epoch prior to sending a Pre-Vote request.


{
"apiKey": 52,
"type": "request",
"listeners": ["controller"],
"name": "VoteRequest",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null"},
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the prospective or candidate sending the request"},
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the voter sending the request"},

{ "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the last record written to the metadata log"},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "The offset of the last record written to the metadata log"},
{ "name": "PreVote", "type": "bool", "versions": "1+",
"about": "Whether the request is a PreVote request (no epoch increase) or not."}
...
}

{
"apiKey": 52,
"type": "response",
"name": "VoteResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level error code."},
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionData",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+"},
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch"},
{ "name": "VoteGranted", "type": "bool", "versions": "0+",
"about": "True if the vote was granted and false otherwise"},
       { "name": "PreVote", "type": "bool", "versions": "1+",
"about": "Whether the response is a PreVote response or not."}

...
}

Proposed Changes

QuorumState changes

We add a new quorum state Prospective for servers which are sending Pre-Vote requests as well as new state transitions. The original (left) and new states (right) are below for comparison.

Note: This adds a new invariant that only Prospective state can transition to Candidate state.

 * Resigned transitions to:
 *    Unattached: After learning of a new election with a higher epoch
 *    Voted: After granting a vote to a candidate
 *    Candidate: After expiration of the election timeout
 *    Follower: After discovering a leader with an equal or larger epoch
*
 * Unattached|Resigned transitions to:
 *    Unattached: After learning of a new election with a higher epoch
 *    Voted: After granting a vote to a candidate
 *    Candidate: After expiration of the election timeout
 *    Follower: After discovering a leader with an equal or larger epoch
*
 * Candidate transitions to:
 *    Unattached: After learning of a new election with a higher epoch
 *    Candidate: After expiration of the election timeout
 *    Leader: After receiving a majority of votes
 *
 * Leader transitions to:
 *    Unattached: After learning of a new election with a higher epoch
 *    Resigned: When shutting down gracefully
 *
 * Follower transitions to:
 *    Unattached: After learning of a new election with a higher epoch
 *    Candidate: After expiration of the fetch timeout
*    Follower: After discovering a leader with a larger epoch

 * Resigned transitions to:
* Unattached: After learning of an election with a higher epoch, or expiration of the election timeout
 *    Follower: After discovering a leader with a larger epoch 
*
* Unattached transitions to:
* Unattached: After learning of an election with a higher epoch * Voted: After granting a standard vote to a candidate * Prospective: After expiration of the election timeout * Follower: After discovering a leader with an equal or larger epoch 
 *
 * Prospective transitions to: 
* Unattached: After learning of an election with a higher epoch, or node did not have last known leader and loses/times out election
* Candidate: After receiving a majority of pre-votes * Follower: After discovering a leader with a larger epoch, or node had a last known leader and loses/times out election
* * Candidate transitions to:  
* Unattached: After learning of a candidate with a higher epoch
* Prospective: After expiration of the election timeout or loss of election * Leader: After receiving a majority of standard votes * Follower: After discovering a leader with an equal or larger epoch (missed in original docs) * * Leader transitions to:
* Unattached: After learning of a candidate with a higher epoch
*    Resigned: When shutting down gracefully
*   * Follower transitions to:
* Unattached: After learning of a candidate with a higher epoch * Prospective: After expiration of the fetch timeout * Follower: After discovering a leader with a larger epoch

New ProspectiveState

A follower will now transition to Prospective instead of Candidate when its fetch timeout expires. Servers will only be able to transition to Candidate state from the Prospective state.

A Prospective server will send a VoteRequest with the PreVote field set to true and ReplicaEpoch set to its current, unbumped epoch. If [majority - 1] of VoteResponse grant the vote, the server will transition to Candidate and will then bump its epoch up and send a VoteRequest with PreVote set to false (which is the original behavior). If enough VoteRequests are rejected or the election timeout expires, the replica transitions to either Unattached or Follower based on if it knows the leader. If a Candidate fails to be elected it transitions to Prospective. (This is to handle the case where a network partition occurs after Prospective transitions to Candidate)

We do need Prospective nodes to transition to a state that continues fetching from the bootstrap voters to make sure they do not get stuck in a remove voter situation. If a node loses connection to the Leader before replicating its own removal, if it does not attempt to fetch then it will never make progress because the Leader will not be sending BeginQuorum to this node. (Scenario also explained under rejected alternative: Prospective cannot transition back to Follower in the same epoch, Followers always reject PreVote requests). For this reason, we will have Prospective nodes transition to Unattached if they lose their election or the election times out without having a known leaderId and endpoint. (Unattached state conveniently serves a dual purpose here of allowing the node a chance to rediscover the leader and also backing off before re-entering Prospective and sending more vote requests.) Prospective state may also transition back to Follower if the node discovers a new Leader through the vote responses, directly receives a BeginQuorum request from the Leader, or fails election but had a known leaderId and endpoint.

Prospective grants standard votes similar to how Unattached does. If it has already granted a vote to a specific replica, it will continue responding granted to that replica. If it has a leaderId noted, it will reject vote requests. Otherwise, it will grant requests from replicas with up-to-date logs. Prospective will "transition" to Prospective with votedKey after granting a standard vote to another replica, similar to how Unattached transitions to Unattached with votedKey. 

CandidateState changes

Candidates currently enter an exponential backoff on election timeout or on receiving a rejected majority of vote responses. We will no longer enter exponential backoff on election timeout.

On election timeout (vote is not granted or rejected before randomElectionTimeoutMs passes), the exponential aspect of the backoff was beneficial to gradually rate limit the number of disruptive epoch increases in a scenario where a Candidate is consistently unable to reach enough of the quorum (for a granted or rejected vote). However, now Candidate can transition immediately to Prospective, which acts as a buffer against another disruptive epoch increase. If the election had timed out for the replica during Candidacy, this means it had not received enough vote responses from the quorum before randomElectionTimeoutMs. As Prospective, the following can happen then

  • nothing changes and the replica is unable to receive enough vote responses from the quorum before randomElectionTimeoutMs, the replica won't increase its epoch.
  • PreVote is rejected, the replica won't increase its epoch and will transition to Unattached or Follower in attempt to reach leader.
  • PreVote is granted (which indicates replica is able to communicate with at least majority of quorum) and replica transitions to Candidate with disruptive epoch bump. We cannot assume the new election will be granted, but we had a good indication that the replica had a chance for being able to communicate with majority of the quorum, and that the majority would grant the vote. 

For the scenario of receiving majority rejected votes, it also makes sense for Candidate state to have a backoff or to wait the remainder of the random election timeout (as suggested by the Raft paper). However, we arguably do not need an exponentially increasing backoff. Candidate will transition to Prospective on loss of the election, which provides a buffer against another disruptive epoch increase. Keeping the exponential backoff behavior adds bloat to Prospective state and unneeded complexity (e.g. tracking the number of times a replica has transitioned back and forth between Candidate and Prospective state, exponential calculation is hard to read). However, we will take changing the backoff behavior in this scenario as out-of-scope as it is not immediately obvious what would be a better alternative (e.g. smaller uniformly random election backoff which means deprecating max election timeout ms, or finish waiting rest of the random election timeout which means potentially longer unavailability of quorum)

FollowerState changes

Followers now track votedKey. This change is not a needed feature of the KIP, but we should not drop persisted state during quorumstate transitions in the same epoch. (In the past, we would lose this information on transitions from Unattached with votedKey to Follower in the same epoch). Now, it is also possible that the transition from Prospective with votedKey to Follower in the same epoch occurs.

ResignedState changes

Resigned voters used to transition directly to Candidate after waiting an election timeout (observers would transition to UnattachedState with epoch + 1). If we simply replace the transitionToCandidate with transitionToProspective, a cordoned leader in epoch 5 could resign in epoch 5, transition to prospective in epoch 5 (with leaderId=localId), fail election and then attempt to become follower of itself in epoch 5. To address this, when Resigned transitions it must increase its epoch.

We can simplify the transition further to have Resigned always transition to Unattached with epoch + 1 after the election timeout (no matter if it is a voter or observer), and have transitionToUnattached initialize the new electionTimeoutMs to the resignedState's remainingElectionTimeoutMs if it is a voter. This effectively causes Resigned voters to transition immediately to Prospective after an election timeout. 

(For more discussion about alternatives and why this option was chosen, see https://github.com/apache/kafka/pull/18240#discussion_r1899341945)

Observers

Similar to how Observers cannot transition to Candidate, they can not transition to Prospective.

Pre-Vote request handling

Nodes in Leader state

These nodes will reject Pre-Vote requests. Note that if the epoch in the Pre-Vote request is greater than theirs, they will transition to Unattached state first (and then vote according to the details covered in the next section)

Nodes in Unattached, Prospective, Candidate, Resigned state

These nodes will grant the Pre-Vote if the requesting replica's log is at least as long as theirs. Since Pre-Votes are not binding or persisted, it is possible and safe for them to grant multiple Pre-Votes if multiple come their way. Even if they have already granted a standard vote to a replica (votedKey is non-empty), they can continue to grant Pre-Votes. 

Nodes in Follower state

These nodes will deny Pre-Votes if they have fetched from the leader successfully at least once. Otherwise they can grant Pre-Votes given that the log length condition is met.

Pre-Vote response handling

When a node receives back Pre-Vote responses, it will do the following:

  • First check if the response received indicates PreVote=true. If we were to receive PreVote=false, that would indicate the recipient of the vote request supported an old version of the Vote API. In this case we transition to Candidate state immediately (default to old behavior).
  • Check if we are in Candidate state. If so, we ignore the response as we've presumably already received enough votes to become candidate.
  • Transition to Candidate state if we have received enough granted votes.
  • If we have received enough rejected votes to lose the election...
    • if we did not have a last known leaderId and endpoint, we will transition to Unattached to see if we can discover the leader via fetch requests to the bootstrap voters.
    • if we did have a last known leaderId and endpoint, we will transition to Follower and attempt to fetch from that leader.

Metrics

Addition of prospective  and prospective-voted values for the current-state metric.

A prior change had removed Voted state and clobbered it into the Unattached state, but had forgotten to modify the current-state metric to match. Since Prospective state can also include voted state, we need to rename voted to unattached-voted so we can add prospective  and prospective-voted values for Pre-Vote.

Benefits of Pre-Vote

How does this prevent unnecessary leadership loss?

We prevent servers from increasing their epoch prior to establishing they can win an election. 

Can Pre-Vote prevent a quorum from electing a leader?

Yes, Pre-Vote needs an additional safeguard to prevent scenarios where eligible leaders cannot be elected.

If a leader is unable to send FETCH responses to [majority - 1] of servers, no new metadata can be committed and we will need a new leader to make progress. We may need the minority of servers which are able to communicate with the leader to grant their vote to prospectives which can communicate with a majority of the cluster. Without Pre-Vote, the epoch bump would have forced servers to participate in the election. With Pre-Vote, the minority of servers which are connected to the leader will not grant Pre-Vote requests. This is the reason why an additional "Check Quorum" safeguard is needed which is what KAFKA-15489 implements. Check Quorum ensures a leader steps down if it is unable to send FETCH responses to a majority of servers. This will free up all servers to grant their votes to eligible prospectives.

Why do we need Followers to reject Pre-Vote requests? Shouldn't the Pre-Vote and Check Quorum mechanism be enough to prevent disruptive servers?

If servers are still within their fetch timeout, this means they have recently heard from a leader. It can be less disruptive if they refuse to vote for a new leader while still following an existing one.

The following scenarios show why just Pre-Vote and Check Quorum (without Followers rejecting Pre-Votes) are not enough.

  • Scenario A: We can image a scenario where two servers (S1 & S2) are both up-to-date on the log but unable to maintain a stable connection with each other. Let's say S1 is the leader. When S2 loses connectivity with S1 and is unable to find the leader, it will start a Pre-Vote. Since its log may be up-to-date, a majority of the quorum may grant the Pre-Vote and S2 may then start and win a standard vote to become the new leader. This is already a disruption since S1 was serving the majority of the quorum well as a leader. When S1 loses connectivity with S2 again it will start an election, and this bouncing of leadership could continue.
  • Scenario B: A server in an old configuration (e.g. S1 in the below diagram, pg 41 of Raft paper) starts a “pre-vote” when the leader is temporarily unavailable, and is elected because it is as up-to-date as the majority of the quorum. We can not technically rely on the original leader replicating fast enough to remove S1 from the quorum - we can imagine some bug/limitation with quorum reconfiguration causes S1 to continuously try to start elections when the leader is trying to remove it from the quorum. This scenario will be covered by KIP-853: KRaft Controller Membership Changes or future work if not covered here.

Note: nodes can ping-pong between Prospective and Follower state if we blindly have Followers reject Pre-Vote requests. As explained under Pre-Vote request handling Followers may grant Pre-Votes under the specific condition that they have not been able to contact the leader successfully yet.

Compatibility

We currently use ApiVersions to gate newer versions of Raft APIs from being used before all servers can support it. This is useful in the upgrade scenario for Pre-Vote - a server's network client will only send the version of the Vote request supported by the intended recipient. If we attempt to serialize the PreVote field with a true value at version < 2 (which is not the default value), the NetworkClient will send a VoteResponse back with an UnsupportedVersion error code. When the Prospective receives this response, it will know that the recipient does not support Pre-Vote and will immediately transition to Candidate state which is the old behavior.

If the Prospective node does receive and process a majority of granted Pre-Vote responses prior to hitting a node which does not support Pre-Vote, it can also transition to Candidate phase. Otherwise, it will transition to Candidate phase once it processes an UnsupportedVersion response and send standard vote requests to all servers. (Any Pre-Vote responses received while in Candidate phase would be ignored since the request manager clears out requests as part of quorum state transitions)

Test Plan

This will be tested with unit tests, integration tests, system tests, and TLA+. 

Rejected Alternatives

Not adding a new quorum state for Pre-Vote

Adding a new state should keep the logic for existing states closer to their original behavior and prevent overcomplicating them. This could aid in debugging as well since we know definitively that servers in Prospective state are sending Pre-Votes, while servers in Candidate state are sending standard votes. 

Rejecting VoteRequests received within fetch timeout (w/o Pre-Vote) 

This was originally proposed in the Raft paper as a necessary safeguard to prevent Scenario A from occurring, but we can see how this could extend to cover all the other disruptive scenarios mentioned.

  • When a partitioned server rejoins and forces the cluster to participate in an election …

    • if a majority of the cluster is not receiving fetch responses from the current leader, they consider the vote request and make the appropriate state transitions. An election would be needed in this case anyways.

    • if the rest of the cluster is still receiving fetch responses from the current leader, they reject the vote request from the disruptive follower. No one transitions to a new state (e.g. Unattached) as a result of the vote request, current leader is not disrupted.

  • For a server in an old configuration (that’s not in the new configuration) …

    • if the current leader is still responding to fetch requests in a reasonable amount of time, the server is prevented from starting and winning elections, which would delay reconfiguration.

    • if the current leader is not responding to fetch requests, then the server could still win an election (this scenario calls for an election anyways). KIP-853 should cover preventing this case if necessary.

  • For a server w/ new disk/data loss …

    • if the current leader is still responding to fetch requests in a reasonable amount of time, the server is prevented from starting and winning elections, which could lead to loss of committed data.

    • if the current leader is not responding to fetch requests, we can reject VoteRequests from the server w/ new disk/data loss if it isn't sufficiently caught up on replication. KIP-853 should cover this case w/ storage ids if necessary.

However, this would not be a good standalone alternative to Pre-Vote because once a server starts a disruptive election (disruptive in the sense that the current leader still has majority), its epoch may increase while none of the other servers' epochs do. The most likely way for the server to rejoin the quorum now with its inflated epoch would be to win an election. 

Separate RPC for Pre-Vote

This would be added toil with no real added benefits. Since a Pre-Vote and a standard Vote are similar in concept, it makes sense to cover both with the same RPC. We can add clear logging and metrics to easily differentiate between Pre-Vote and standard Vote requests.

Prospective cannot transition back to Follower in the same epoch, Followers always reject PreVote requests 

The suggestion here was to not allow the transition of Prospective state back to Follower unless the node receives a BeginQuorum request from the Leader or discovers a Leader with a larger epoch through any other response type (e.g. PreVote response, Fetch response) specifically with a larger epoch. We would require that it be a larger epoch if not from a BeginQuorum  request due to an edge case with nodes flip-flopping between Prospective and Follower state. The reason why is due to the following edge case illustrated by Jack Vanlightly:

If a Leader Node 1 loses connectivity with Node 3 first and Node 3 transitions to Prospective, they may receive a Pre-Vote response from Node 2 that still specifies Node 1 as Leader. If Node 3 then transitions back to Follower with the same epoch, and Node 2 finally times out and becomes Prospective, the cycle continues. Essentially, even though a Follower has a leaderId in memory, there is no guarantee they were ever able to fetch from that leader since the time they became Follower. 

Why even allow the transition from Prospective back to Follower? This is due to another edge case with removed voters from Jose Sancio:

  1. Assume voters set (1, 2, 3) is replicated to all of the voters (1, 2, 3) and the leader is 1.
  2. Remove 3 from voters set while 3 loses connection to leader 1.
  3. Voter set 1, 2 gets replicated to replica 2 and 1 but not 3.
  4. Replica 3 assumes it is in the voter set, its fetch timer expires and transitions to prospective.
  5. Assume now that replica 3 would be able to send fetch requests to leader 1.

How do we get replica 3 back to fetching from the leader? This is what we know:

  1. The leader 1 won't send BeginQuorumEpoch to replica 3 because it is not in its voter set.
  2. Replica 3 thinks it is in the voter set so it will stay in Prospective trying to win an election.
  3. We need to have a mechanism for replica 3 to discover that the leader is back to accepting Fetch requests.

One solution would be that the replicas in Prospective state can transition to Follower only if they received a BeginQuorum request directly from the leader or find the leader through a response with a larger epoch. Otherwise they continue fetching from the last known leader/bootstrap voters. However, the other solution (which we decided to take) is for replicas in prospective state to be able to transition to Follower as long as they receive any response with equal or greater epoch denoting a leader, but can only reject PreVote requests if they have managed to successfully fetch from the leader at least once. Both solutions are viable, but the first solution mentioned involves a more complex/disruptive implementation (would have to change fetch related request handling to only allow transition to Follower conditionally) whereas the second solution is also more intuitive to understand.

Covering disk loss scenario in scope

This scenario shares similarities with adding new servers to the quorum, which KIP-853: KRaft Controller Membership Changes would handle. If a server loses its disk and fails to fully catch up to the leader prior to another server starting an election, it may vote for any server which is at least as caught up as itself (which might be less than the last leader). One way to handle this is to add logic preventing servers with new disks (determined via a unique storage id) from voting prior to sufficiently catching up on the log. Another way is to reject pre-vote requests from these servers. We leave this scenario to be covered by KIP-853 or future work because of the similarities with adding new servers.

Time

Server 1

Server 2

Server 3

T0

Leader with majority of quorum (Server 1, Server 3) caught up with its committed data

Lagging follower

Follower

T1



Disk failure

T2

Leader → Unattached state

Follower → Unattached state

Comes back up w/ new disk, triggers an election before catching up on replication




Will not be elected

T4


Election ms times out and starts an election


T5


Votes for Server 2

Votes for Server 2

T6


Elected as leader leading to data loss




  • No labels

1 Comment

  1. Renamed to KIP-996 as 991 was already taken