You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state: Draft

Discussion thread

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note this is a joint work proposed by  Boyang Chen, Guozhang Wang, and Jason Gustafson

Motivation

This proposal follows KIP-500. Log replication is at the heart of consensus protocols such as Raft. It is also at the heart of Kafka. The replication protocol that we spent years improving and validating is actually not too different from Raft if you take a step back. There is a single leader at any time which is enforced by a monotonically increasing epoch; messages are unique by offset and epoch; and there is a protocol to reconcile log inconsistencies following leader changes using offset and epoch information from the log. The gaps between Kafka's replication protocol and Raft are the following:

  1. Quorum commit semantics: messages in Kafka are considered committed after replication to the current ISR, which is maintained in Zookeeper.
  2. Leader election: in Kafka, leader elections are done by the controller which writes the updated state to Zookeeper.

Essentially we have relied on Zookeeper's consensus protocol up now to enforce consistent replication semantics. With KIP-500, we are taking a step into the wild. This proposal is about bridging the gaps in the Kafka replication protocol in order to support the controller's metadata quorum. 

The protocol we are proposing is a sort of Raft dialect which is heavily influenced by Kafka's log replication protocol. For example, it is pull-based unlike Raft which is push-based. This adds some complication, but mostly it's just looking at Raft through a mirror. We also have favored Kafka terminology (offset/epoch) over traditional Raft terminology (index/term). Think of the protocol as beginning with Kafka log replication and adding Raft leader election. 

Note that this protocol assumes something like the Kafka v2 log message format. It is not compatible with older formats because records do not include the leader epoch information that is needed for log reconciliation. We have intentionally avoided any assumption about the representation of the log and its semantics. This makes it usable both for internal metadata replication and (eventually) partition data replication.

Key Concepts

If you are familiar with Kafka replication, most of the same concepts apply; however we replace ISR commit semantics with quorum commit semantics and add leader election.

Leader Epoch: Known as a "term" in Raft, this is a monotonically increasing sequence which is incremented during every leader election. This is used both to fence zombies and to reconcile logs which have diverged. Log records are uniquely identified by the offset in the log and the epoch of the leader that did the append.

High watermark: Although there is no notion of an ISR for a quorum-based replication protocol, we still have a high watermark to control when a record is considered "committed." This is the largest offset which is replicated to a majority of the voters. The protocol is designed to guarantee that committed records are not lost.

Voter: A voter is a replica which is eligible to cast votes during an election and potentially become a leader.

Candidate: When an election is started a voter will bump the leader epoch and cast a vote for itself. We refer to these as candidates.

Observer: An observer is a replica which is not eligible to vote and cannot become a leader. It is only responsible for discovering the leader and replicating the log.

State Machine

Configurations

  • bootstrap.servers:  Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.
  • quorum.voters: Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on FindQuorum (described below) to discover the current voters of the quorum. 
  • quorum.progress.timeout.ms: Maximum time without received fetch records request from a majority of the quorum before asking if there's a new epoch leader via FindQuorum.
  • quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.jitter.max.ms: Maximum random jitter after an election timeout before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).
  • broker.id: The existing broker id config shall be used as the voter id in the Raft quorum.

Persistent State

This proposal requires a persistent log as well as a separate file to maintain the current quorum state. Below we define the structure of this state.

Log Structure

We assume a normal log structure for the metadata log (which we may also refer to as the "metadata topic"). The protocol we define in this proposal is agnostic to the record format, but we assume the following fields at a minimum for individual records:

Record => Offset LeaderEpoch ControlType Key Value Timestamp 

Records are uniquely defined by their offset in the log and the epoch of the leader that appended the record. The key and value schemas will be defined by the controller in a separate KIP; here we treat them as arbitrary byte arrays. However, we do require the ability to append "control records" to the log which are reserved only for use within the Raft quorum (e.g. this enables quorum reassignment).

Kafka's current v2 message format version supports everything we need, so we will assume that.

Quorum State 

We use a separate file to store the current state of the quorum. This is both for convenience and correctness. It helps us to initialize the quorum state after a restart, but we also need it in order to know which broker we have voted for in a given election. The Raft protocol does not allow voters to change their votes, so we have to preserve this state across restarts. Below is the schema for this quorum-state file.

{
  "type": "message",
  "name": "QuorumStateMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
	  {"name": "LeaderId", "type": "int32", "versions": "0+"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+"},
      {"name": "VotedId", "type": "int32", "versions": "0+"},
      {"name": "AppliedOffset", "type": "int64", "versions": "0+"},
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]},
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}


Below we define the purpose of these fields:

  • ClusterId: the clusterId, which is persisted in the log and used to ensure that we do not mistakenly interact with the wrong cluster.
  • LeaderId: this is the last known leader of the quorum. A value of -1 indicates that there is no leader.
  • LeaderEpoch: this is the last known leader epoch. This is initialized to 0 when the quorum is bootstrapped and should never be negative.
  • VotedId: indicates the id of the broker that this replica voted for in the current epoch. A value of -1 indicates that the replica has not (or cannot) vote.
  • AppliedOffset: Reflects the maximum offset that has been applied to this quorum state. This is used for log recovery. The broker must scan from this point on initialization to detect updates to this file.
  • CurrentVoters: the latest known set of voters for this quorum.
  • TargetVoters: the latest known target voters if the quorum is being reassigned.

The use of this file will be described in more detail below as we describe the protocol. Note one key difference of this internal topic compared with other topics is that we should always enforce fsync upon appending to local log to guarantee Raft algorithm correctness.

Additionally, we make use of the current meta.properties file, which caches the value from broker.id in the configuration as well as the discovered clusterId. As is the case today, the configured broker.id does not match the cached value, then the broker will refuse to start. Similarly, the cached clusterId is included when interacting with the quorum APIs. If we connect to a cluster with a different clusterId, then the broker will receive a fatal error and shutdown.

Leader Election and Data Replication

The key functionalities of any consensus protocol are leader election and data replication. The protocol for these two functionalities consists of 5 core RPCs:

  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
  • FetchQuorumRecords: Sent by voters and observers to the leader in order to replicate the log.
  • FindQuorum: Used to discover or view current quorum state when bootstrapping a broker.

There are also administrative APIs:

  • AlterQuorum: Administrative API to change the members of the quorum.
  • DescribeQuorum: Administrative API to list the replication state (i.e. lag) of the voters.

Before getting into the details of these APIs, there are a few common attributes worth mentioning upfront:

  1. The core requests have a field for the clusterId. This is validated when a request is received to ensure that misconfigured brokers will not cause any harm to the cluster.
  2. All requests save Vote have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not.
  3. We piggyback current leader and epoch information on all responses. This reduces latency to discover a leader change.

Note: This protocol is only concerned with leader election and log replication. It does not specify how new log entries are appended to the leader's log. Typically this would be through specific metadata APIs. For example, KIP-497 adds an AlterISR API. When the leader of the metadata quorum (i.e. the controller) receives an AlterISR request, it will append an entry to its log.

Below we describe the schema and behavior of each of these APIs. 

Vote

The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:

  1. If it fails to receive a FetchQuorumRecordsResponse from the current leader before expiration of quorum.fetch.timeout.ms
  2. If it receives a EndQuorumEpoch request from the current leader
  3. If it fails to receive a majority of votes before expiration of quorum.election.timeout.ms after declaring itself a candidate.

A voter triggers an election by first voting for itself and updating the quorum-state file. Once this state is persistent, the voter becomes a candidate and sends a VoteRequest to all the other voters. Note that a candidate cannot change its vote once cast for itself.

Note that new elections are always delayed by a random time which is bounded by quorum.election.jitter.max.ms. This is part of the Raft protocol and is meant to prevent gridlocked elections. For example, with a quorum size of three, if only two voters are online, then we have to prevent repeated elections in which each voter declares itself a candidate and refuses to vote for the other.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersion": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "CandidateEpoch", "type": "int32", "versions": "0+",
       "about": "The bumped epoch of the candidate sending the request"},
      {"name": "CandidateId", "type": "int32", "versions": "0+",
       "about": "The ID of the voter sending the request"},
      {"name": "LastEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the last record written to the metadata log"},
      {"name": "LastEpochEndOffset", "type": "int64", "versions": "0+",
       "about": "The offset of the last record written to the metadata log"}
  ]
}

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"},
      {"name": "VoteGranted", "type": "bool", "versions": "0+"
       "about": "True if the vote was granted and false otherwise"}
  ]
}

Vote Request Handling

Note we need to respect the extra condition on leadership: the candidate’s log is at least as up-to-date as any other log in the majority who vote for it. Raft determines which of two logs is more "up-to-date" by comparing the offset and epoch of the last entries in the logs. If the logs' last entry have different epochs, then the log with the later epoch is more up-to-date. If the logs end with the same epoch, then whichever log is longer is more up-to-date. The Vote request includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate.

When a voter decides to become a candidate and ask for others to vote for it, it will increment its current leader epoch as CandidateEpoch.

When a voter handles a Vote request:

  1. First it checks whether an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  2. It checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  3. If the candidate epoch is larger than the currently known epoch:
    1. Check whether CandidateId is one of the expected voters. If not, then reject the vote. The candidate in this case may have been part of an incomplete AlterQuorum change, so the voter should accept the epoch bump and itself begin a new election.
    2. Check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

Also note that a candidate always votes for itself at the current candidate epoch. That means, it will also need to update the quorum-state file as "voting for myself" before sending out the vote requests. On the other hand, if it receives a Vote request with a larger candidate epoch, it can still grants that vote while at the same time transiting back to voter state because a newer leader may has been elected for a newer epoch.

New Error Codes

LARGER_KNOWN_EPOCH(201, "The voter has seen larger epoch during election", LargerKnownEpochException::new),
LARGER_END_OFFSET(202, "The voter has seen larger end offset with the given epoch during election", LargerEndOffsetException::new);

Vote Response Handling

When receiving a Vote response:

  1. First we need to check if the voter is still a candidate because it may already observed an election with a larger epoch; if it is no longer in "candidate" state, just ignore the response.
  2. Otherwise, check if it has accumulated majority of votes for this epoch – this information does not need to be persisted, since upon failover it can just resend the vote request and the voters would just grant that request again as long as there's no newer epoch candidate – and if yes, it can transit to the leader state by updating the quorum-state file indicating itself as the leader for the new epoch. Otherwise, just record this vote in memory and do nothing.
  3. Upon becoming the leader, it would also start by writing the current assignment state to its log so that its LastEpoch and LastEpochOffset are updated, and then after that it can start sending out BeginQuorumEpoch requests. 

Writing the dummy entry with the new epoch is not necessary for correctness, but as an optimization to reduce the client request latency. Basically it allows for faster advancement of the high watermark following a leader election (the need for this is discussed in more detail below in the handling of FetchQuorumRecords). The dummy record will be a control record with Type=3. Below we define the schema: 

{
  "type": "message",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
	  {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "VotedIds", "type": "[]int32", "versions": "0+",
       "about": "The IDs of the voters who voted for the current leader"},

  ]
}

Also note that unlike other Kafka topic partition data whose log appends are persisted asynchronously, for this special quorum topic all log appends must be synced to FS before returning.

Note on Gridlocked Elections:  It is possible that an election fails because each voter votes for itself. Or, with an even number of voters active, it is possible that a vote ends up split. Generally if a voter fails to get a majority of votes before quorum.election.timeout.ms, then the vote is deemed to have failed, which will cause the candidate to bump the epoch, step down, and backoff according to quorum.election.jitter.max.ms before retrying. Under some situations, a candidate can immediately detect when a vote has failed. For example, if there are only two voters and a candidate fails to get a vote from the other voter (i.e. VoteGranted is returned as false in the VoteResponse), then there is no need to wait for the election timeout. The candidate in this case can immediately step down and backoff.

BeginQuorumEpoch

In traditional Raft, the leader will send an empty Append request to the other nodes in the quorum in order to assert its term. As a pull-based protocol, we need a separate BeginQuorumEpoch API to do the same in order to ensure election results are discovered quickly. In this protocol, once a leader has received enough votes, it will send the BeginQuorumEpoch request to all voters in the quorum.

Note that only voters receive the BeginQuorumEpoch request: observers will discover the new leader through either the FindQuorum or FetchQuorumRecords APIs. For example, the old leader would return an error code in FetchQuorumRecords response indicating that it is no longer the leader and it will also encode the current known leader id / epoch as well, then the observers can start fetching from the new leader. In case the old leader does not know who's the new leader, observers can still fallback to FindQuorum request to discover the new leader.

TODO: If we piggyback the leader ID on some of the APIs, we may need to add connection information as well. Otherwise, if the hostname changes after a restart, a voter may be stuck with stale information.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the newly elected leader"}
  ]
}

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
  ]
}

BeginQuorumEpoch Request Handling

A voter will accept a BeginQuorumEpoch if its leader epoch is greater than or equal to the current known epoch so long as it doesn't conflict with previous knowledge. For example, if the leaderId for epoch 5 was known to be A, then a voter will reject a BeginQuorumEpoch request from a separate voter B from the same epoch. If the epoch is less than the known epoch, the request is rejected. 

As soon as a broker accepts a BeginQuorumEpoch request, it will transition to a follower state and begin sending FetchQuorumRecords requests to the new leader.

BeginQuorumEpoch Response Handling

If the response contains no errors, then the leader will record the follower in memory as having endorsed the election. The leader will continue sending BeginQuorumEpoch to each known voter until it has received its endorsement. This ensures that a voter that is partitioned from the network will be able to discover the leader quickly after the partition is restored. An endorsement from an existing voter may also be inferred through a received FetchQuorumRecords request with the new leader's epoch even if the BeginQuorumEpoch request was never received.

If the error code indicates that voter's known leader epoch is larger, then the voter will update quorum-state and become a follower of that leader and begin sending FetchQuorumRecords requests.

We shall reuse the same LARGER_LEADER_EPOCH error code.

EndQuorumEpoch

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. It is sent to all voters in the quorum. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum request. 

Upon receiving an EndQuorumEpoch request, voters will begin a new election after a random time bounded by quorum.election.jitter.max.ms.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
    {"name": "ClusterId", "type": "string", "versions": "0+"},
    {"name": "ReplicaId", "type": "int32", "versions": "0+",
     "about": "The ID of the replica sending this request"},
    {"name": "LeaderId", "type": "int32", "versions": "0+",
     "about": "The current leader ID or -1 if there is a vote in progress"},
    {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
     "about": "The current epoch"}
  ]
}

Note that LeaderId and ReplicaId will be the same if the leader has been voted. If the replica is a candidate in a current election, then LeaderId will be -1.

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
  ]
}

EndQuorumEpoch Request Handling

Upon receiving the EndQuorumEpoch, the voter checks if the epoch from the request is greater than or equal to its last known epoch. If yes then it can transit to candidate state after waiting a random time up to the maximum jitter time defined by quorum.election.jitter.max.ms. Before beginning to collect voters, the voter must update the quorum-state file. If the epoch is smaller than the last known epoch, or the leader id is not known for this epoch, the request is rejected.

EndQuorumEpoch Response Handling

If there's no error code, then do nothing. Otherwise if the error code indicates there's already higher epoch leader already ("hey you're old news now so I don't care if you're stepping down or what"), then updating its quorum-state file while transiting to follower state. The leader treats this as a best-effort graceful shutdown. If voters cannot be reached to send EndQuorumEpoch, the leader will shutdown without retrying. In the worst case, if none of the EndQuorumEpoch requests are received, the election timeout will eventually trigger a new election.

We shall reuse the same LARGER_LEADER_EPOCH error code.

FetchQuorumRecords

The fetch request is sent by both voters and observers to the current leader in order to replicate log changes. For voters this also serves as a liveness check of the leader.

Log reconciliation: The Raft protocol does not guarantee that the replica with the largest offset is always elected. This means that following a leader election, a follower may need to truncate some of the uncommitted entries from its log. The Kafka log replication protocol has a similar problem following leader election and it is resolved with a separate truncating state which is entered after every leader change. In the truncating state, followers will use the OffsetsForLeaderEpoch API to find the diverging offset between its log and the leader's. Once that is found, the log is truncated and replication continue.

Rather than following the Kafka approach, this protocol piggybacks log reconciliation on the FetchQuorumRecords API, which is more akin to Raft replication. When voters or leaders send a FetchQuorumRecords request, in addition to including an offset to fetch from, they also indicate the epoch of the last offset that they have in their local log (we refer to this as the "fetch epoch"). The leader always checks whether the fetch offset and fetch epoch are consistent with its own log. If they do not match, the fetch response will indicate the largest epoch and its end offset before the requested epoch. It is the responsibility of followers to detect leader changes and use FetchQuorumRecords responses to truncate the log.

The advantage of this approach is that it simplifies the follower state machine. Basically it just needs to sends fetches to the current expected leader with the fetch offset. The response may indicate either a new leader to fetch from or a new fetch offset. It is also safer because the leader is able to validate the fetch offset on every request rather than relying on the follower to detect the epoch changes.

Extra condition on commitment: The Raft protocol requires the leader to only commit entries from any previous epoch if the same leader has already successfully replicated an entry from the current epoch. Kafka's ISR replication protocol suffers from a similar problem and handles it by not advancing the high watermark until the leader is able to write a message with its own epoch. The diagram below taken from the Raft dissertation illustrates the scenario:


The problem concerns the conditions for commitment and leader election. In this diagram, S1 is the initial leader and writes "2," but fails to commit it to all replicas. The leadership moves to S5 which writes "3," but also fails to commit it. Leadership then returns to S1 which proceeds to attempt to commit "2." Although "2" is successfully written to a majority of the nodes, the risk is that S5 could still become leader, which would lead to the truncation of "2" even though it is present on a majority of nodes.

In Kafka's replication protocol, we address this issue by not advancing the high watermark after an election until an entry has been written by the new leader with its epoch. So we would allow the election to S5, which had written "3" even though a majority of nodes had written "2." This is is allowable because "2" was not considered committed. Therefore, as long as we do not respond to client's request for the entries it added until the high watermark is advanced beyond it then this extra condition is satisfied.

This proposal follow's Kafka's replication protocol. The high watermark for the quorum is not advanced until a record from the current epoch has been written. This is the purpose of the "dummy" leader change message that was mentioned in the section on Vote handling. Without this new message, it could be some time before a new entry is written, which could prevent recent appends from being committed.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "FetchQuorumRecordsRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "int32", "versions": "0+"},
      {"name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The ID of the replica sending the request"},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The current leader epoch"},
      {"name": "FetchOffset", "type": "int64", "versions": "0+",
       "about": "The next expected offset to be replicated"},
      {"name": "FetchEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the last replicated record"}
  ]
}

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "FetchQuorumRecordsResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "NextFetchOffset", "type": "int64", "versions": "0+",
       "about": "If set, this is the offset that the follower should truncate to"},
      {"name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
       "about": "The epoch of the next offset in case the follower needs to truncate"},
      {"name": "Records", "type": "bytes", "versions": "0+",
       "about" "The fetched record data"},
      {"name": "HighWatermark", "type": "int64", "versions": "0+",
       "about": "The current high watermark"},
	  {"name": "FirstDirtyOffset", "type": "int64", "versions": "0+",
       "about": "First dirty offset which allows followers to determine consistent snapshots"},
      {"name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+",
       "about" "The last time the follower was caught up with a majority of the voters"}
  ]
}

FetchQuorumRecords Request Handling

When a leader receives a FetchQuorumRecords request, it must check the following:

  1. Verify that the leader epoch is the same. If not, reject this request with MISMATCH_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself would learn about the new epoch anyways.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

The check in step 2 is similar to the logic that followers use today in Kafka through the OffsetsForLeaderEpoch API. In order to make this check efficient, Kafka maintains a leader-epoch-checkpoint file on disk, the contents of which is cached in memory. After every epoch change is observed, we write the epoch and its start offset to this file and update the cache. This allows us to efficiently check whether the leader's log has diverged from the follower and where the point of divergence is. Note that it may take multiple rounds of FetchQuorumRecords in order to find the first offset that diverges in the worst case.

FetchQuorumRecords Response Handling

When handling the response, a follower/observer will do the following:

  1. If the response contains MISMATCH_EPOCH error code, check the leaderId from the response. If it is defined, then update the quorum-state file and become a follower. Otherwise, try to learn about the new leader via FindQuorum requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well.
  2. If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the FetchRecord request.
  3. If a follower has been kicked out off the quorum, it will receive a non-fatal NOT_FOLLOWER error code, and do a self-downgrade.

New Error Codes

MISMATCH_EPOCH(203, "The fetched follower has an inconsistent epoch with leader", LargerKnownEpochException::new);
NOT_FOLLOWER(204, "The fetched replica is no longer inside the quorum as a follower", NotFollowerException::new);

Discussion: Replication Progress Timeout for Zombie Leader

There's one caveat of the pull-based model: say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader".

To resolve this issue, we introduced a "progress.timeout.ms" config, such that if the leader did not receive FetchQuorumRecords requests from a majority of the quorum for that amount of time, it would start sending FindQuorum request to random nodes in the cluster. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes an observer; and if it realized that it is still within the current quorum's voter list, it would start fetching from that leader. Note that the node will remain a leader until it finds that it has been supplanted by another voter.

Discussion: Pull v.s. Push Model

In the original Raft paper, the push model is used for replicating states, where leaders actively send requests to followers and count quorum from returned acknowledgements to decide if an entry is committed.

In our implementation, we used a pull model for this purpose. More concretely:

  • Consistency check: In the push model this is done at the replica's side, whereas in the pull model it has to be done at the leader. The voters / observers send the fetch request associated with its current log end offset. The leader will check its entry for offset, if matches the epoch the leader respond with a batch of entries (see below); if does not match, the broker responds to let the follower truncate all of its current epoch to the next entry of the leader’s previous epoch's end index (e.g. if the follower’s term is X, then return the next offset of term Y’s ending offset where Y is the largest term on leader that is < X). And upon receiving this response the voter / observer has to truncate its local log to that offset.
  • Extra condition on commitment: In the push model the leader has to make sure it does not commit entries that are not from its current epoch, in the pull model the leader still has to obey this by making sure that the batch it sends in the fetch response contains at least one entry associated with its current epoch.

Based on the recursive deduction from the original paper, we can still conclude that the leader / replica log would never diverge in the middle of the log while the log end index / term still agrees.

Comparing this implementation with the original push based approach:

  • On the Pro-side: with the pull model it is more natural to bootstrap a newly added member with empty logs because we do not need to let the leader retry sending request with decrementing nextIndex, instead the follower just send fetch request with zero starting offset. Also the leader could simply reject fetch requests from old-configuration’s members who are no longer part of the group in the new configuration, and upon receiving the rejection the replica knows it should shutdown now — i.e. we automatically resolve the “disruptive servers” issue.
  • On the Con-side: zombie leader step-down is more cumbersome, as with the push model, the leader can step down when it cannot successfully send heartbeat requests within a follower timeout, whereas with the pull model, the zombie leader does not have that communication pattern, and one alternative approach is that after it cannot commit any entries for some time, it should try to step down. Also, the pull model could introduce extra latency (in the worst case, a single fetch interval) to determine if an entry is committed, which is a problem especially when we want to update multiple metadata entries at a given time — this is pretty common in Kafka's use case — and hence we need to consider supporting batch writes efficiently in our implementation.

FindQuorum

The FindQuorum API is primarily used by new brokers to discover the leader of the current quorum and its voters. When a broker first starts up, for example, it will send FindQuorum requests to the bootstrap.servers. Once it discovers the current quorum membership and its own state, then will it resume participation in elections.

The FindQuorum API also provides the crucial role of discovering connection information for existing nodes using a gossip approach. Brokers include their own connection information in the request and collect the connection information of the voters in the cluster from the response.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "FindQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "BootTimestamp", "type": "int64", "versions": "0+"},
      { "name": "Host", "type": "string", "versions": "0+"},
      { "name": "Port", "type": "int32", "versions": "0+"},
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+"}
   ]
}

The BootTimestamp field is defined when a broker starts up. We use this as a heuristic for finding the latest connection information for a given replica.

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "FindQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"}
      {"name": "Voters", "type": "[]Voter", "versions": "0+",
       "about": "The current voters" },
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersion": "0+",
       "about": "The target voters if there is a reassignment in progress, null otherwise" },
  ],
  "commonStructs": [
    { "name": "Voter", "versions": "0+", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "BootTimestamp", "type": "int64", "versions": "0+"},
        { "name": "Host", "type": "string", "versions": "0+"},
        { "name": "Port", "type": "int32", "versions": "0+"},
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+"}
    }
  ]
}

FindQuorum Request Handling

When a broker receives a FindQuorum request, it will do the following:

  1. If the ReplicaId from the request is greater than 0 and matches one of the existing voters (or target voters), update local connection information for that node if either that node is not present in the cache or if the BootTimestamp from the request is larger than the cached value.
  2. Respond with the latest known leader and epoch along with the connection information of all known voters.

Note on AdminClient Usage: This API is also used by the admin client in order to describe/alter quorum state. In this case, we will use a sentinel -1  for ReplicaId. The values for the other fields in the request will not be inspected by the broker if the ReplicaId is less than 0, so we can choose reasonable sentinels for them as well (e.g. Host can be left as an empty string).

FindQuorum Response Handling

After receiving a FindQuorum response

  1. First check the leader epoch in the response against the last known epoch. It is possible that the node receives the latest epoch from another source before the FindQuorum returns, so we always have to check. If the leader epoch is less than the last known epoch, we ignore the response.
  2. The node will then update its local cache of connection information with the values from the response. As when handling requests, the node will only update cached connection information corresponding to the largest observed BootTimestamp.
  3. Next check if there is a leader defined in the response. If so, then the node will update quorum-state and become a follower.
  4. If there is no leader and the sender is one of the voters, then the node will become a candidate and attempt to find the other nodes in the quorum (if they are not known) in order to send VoteRequest to. If the sender is not a voter, then it will continue to try and discover the leader by sending FindQuorum to other known voters or to bootstrap.servers.

Note on Connection Discovery with Gossip: The reason this API is specified as a gossip protocol is that the cluster must be able to propagate connection information even when there are insufficient voters available to establish a quorum. There must be some base layer which is able to spread enough initial metadata so that voters can connect to each other and conduct elections. An alternative way to do this would be to require connection information always be defined statically in configuration, however this makes operations cumbersome in modern cloud environments where hosts change frequently. We have opted instead to embrace dynamic discovery.

Quorum Bootstrapping

This section discusses the process for establishing the quorum when the cluster is first initialized and afterwards when nodes are restarted. 

When first initialized, we rely on the quorum.voters configuration to define the expected voters. Brokers will only rely on this when starting up if there is no quorum state written to the log and if the broker fails to discover an existing quorum through the FindQuorum API. We suggest that a --bootstrap flag could be passed through kafka-server-start.sh when the cluster is first started to add some additional safety. In that case, we would only use the config when the flag is specified and otherwise expect to discover the quorum dynamically.

Assuming the broker fails to discover an existing quorum, it will then check its broker.id to see if it is expected to be one of the initial voters. If so, then it will immediately include itself in FindQuorum responses. Until the quorum is established, brokers will send FindQuorum to the nodes in bootstrap.servers and other discovered voters in order to find the other expected members. Once enough brokers are known, the brokers will begin a vote to elect the first leader.

Note that the FindQuorum API is intended to be a gossip API. Quorum members use this to advertise their connection information on bootstrapping and to find new quorum members. All brokers use the latest quorum state in response to FindQuorum requests that they receive. The latest voter state is determined by looking at 1) the leader epoch, and 2) boot timestamps. Brokers only accept monotonically increasing updates to be cached locally.

Bootstrapping Procedure

To summarize, this is the procedure that brokers will follow on initialization:

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending FetchQuorumRecords request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending FindQuorum to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
  4. If even step 3) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the FindQuorum responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send FindQuorum to all other brokers in the cluster via boostrap.servers.
  5. For any voter, after it has learned a majority number of voters in the expected quorum from FindQuorum responses, it will begin a vote.

When a leader has been elected for the first time in a cluster (i.e. if the leader's log is empty), the first thing it will do is append a VoterAssignmentMessage (described in more detail below) which will contain quorum.voters as the initial CurrentVoters. Once this message has been persisted in the log, then we no longer need `quorum.voters` and users can safely roll the cluster without this config.

ClusterId generation: Note that the first leader is also responsible for providing the ClusterId field which is part of the VoterAssignmentMessage. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId provided in requests when receiving Vote, BeginEpoch, EndEpoch, and FetchQuorumRecords requests. 

The leader will also append a LeaderChangeMessage as described in the VoteResponse handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.

Bootstrapping Example

With this procedure in mind, a convenient way to initialize a cluster might be the following.

  1. Start with a single node configured with broker.id=0 and quorum.voters=0.
  2. Start the node and verify quorum status. This would also be a good opportunity to make dynamic config changes or initialize security configurations.
  3. Add additional nodes to the cluster and verify that they can connect to the quorum.
  4. Finally use AlterQuorum to grow the quorum to the intended size.

Quorum Reassignment

The protocol for changing the active voters is well-described in the Raft literature. The high-level idea is to use a new control record type to write quorum changes to the log. Once a quorum change has been written and committed to the log, then the quorum change can take effect.

Quorum Change Message Schema

This will be a new control record type with Type=2 in the key schema. Below we describe the message value schema used for the quorum change messages written to the log. 

{
  "type": "message",
  "name": "VoterAssignmentMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"}
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]},
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

DescribeQuorum

The DescribeQuorum API is used by the admin client to show the status of the quorum. This includes showing the progress of a quorum reassignment and viewing the lag of followers.

Unlike the FindQuorum request, this API must be sent to the leader, which is the only node that would have lag information for all of the voters.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "DescribeQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": []
}

Response Schema

{
  "apiKey": N,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"},
      {"name": "HighWatermark", "type": "int64", "versions": "0+"},
	  {"name": "CurrentVoters", "type": "[]VoterState", "versions": "0+" },
      {"name": "TargetVoters", "type": "[]VoterState", "versions": "0+" }
  ],
  "commonStructs": [
	{"name": "VoterState", "versions": "0+", "fields": [
  	    { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "LogEndOffset", "type": "int64", "versions": "0+",
          "about" "The last known log end offset of the follower or -1 if it is unknown"},
        { "name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+",
          "about" "The last time the follower was caught up to the high watermark"},
    ]}
  ]
}

DescribeQuorum Request Handling

This request is always sent to the leader node. We expect AdminClient to use FindQuorum in order to discover the current leader. Upon receiving the request, a node will do the following:

  1. First check whether the node is the leader. If not, then return an error to let the client retry with FindQuorum. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using cached information about replication progress.

DescribeQuorum Response Handling

On handling the response, the admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

New Error Codes

NOT_QUORUM_LEADER(205, "Target broker is not he leader of this quorum", LargerKnownEpochException::new);

AlterQuorum

The AlterQuorum API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.

The effect of AlterQuorum is to change the TargetVoters field in the VoterAssignmentMessage defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.

Cancellation: If TargetVoters is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters. Note that it is always possible to send a new AlterQuorum request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

Request Schema

{
  "apiKey": N,
  "type": "request",
  "name": "AlterQuorum",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "TargetVoters", "type": "[]Voter", "nullableVersions": "0+", "default": "null", 
	   "about": "The target quorum, or null if this is a cancellation request",
	   "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

Response Schema

{
  // Possible top level error codes:
  //
  // NOT_QUORUM_LEADER
  // CLUSTER_AUTHORIZATION_FAILED
  //
  // Possible node level error codes:
  //
  // UNKNOWN_VOTER_ID
  //
  "apiKey": N,
  "type": "response",
  "name": "AlterQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
	  { "name": "TargetVoters", "type": "[]VoterResult", "versions": "0+",
        "about": "The responses for each target voters in the new quorum.", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
  ]
}

AlterQuorum Request Handling

Upon receiving the AlterQuorum request, the node will verify a couple of things:

  1. If the target node is not the leader of the quorum, return NOT_QUORUM_LEADER code to the admin client. 
  2. If the included voter is unknown to the quorum leader, return UNKNOWN_VOTER_ID code to the admin client.
  3. Otherwise, if the TargetVoters in the request does not match the current TargetVoters, then append a new VoterAssignmentMessage to the log and wait for it to be committed. 
  4. Return successfully only when the desired TargetVoters has been safely committed to the log.

After returning the result of the request, the leader will begin the process to modify the quorum. This is described in more detail below, but basically the leader will begin tracking the fetch state of the target voters. It will then make a sequence of single-movement alterations by appending new VoterAssignmentMessage records to the log.

AlterQuorum Response Handling

The response handling is similar to DescribeQuorum.  The admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise return the successful response to the application.

 Once the reassignment has been accepted by the leader, then a user can monitor the status of the reassignment through the DescribeQuorum API. 

New Error Codes

UNKNOWN_VOTER_ID(206, "The specified target voter is unknown to the leader", UnknownVoterIdException::new);

Quorum Change Protocol

This protocol allows arbitrary quorum changes through the AlterQuorum API. Internally, we arrive at the target quorum by making a sequence of single-member changes.

Before getting into the details, we consider an example which summarizes the basic mechanics. Suppose that we start with a quorum consisting of the replicas (1, 2, 3) and we want to change the quorum to (4, 5, 6). When the leader receives the request to alter the quorum, it will write a message to the current quorum indicating the desired change:

currentVoters: [1, 2, 3]
targetVoters: [4, 5, 6]

Once this message has been committed to the quorum, the leader will add 4 to the current quorum by writing a new log entry:

currentVoters: [1, 2, 3, 4]
targetVoters: [4, 5, 6]

A common optimization for this step is to wait until 4 has caught up near the end of the log before adding it. Similarly, we may choose to add another replica first if it has already caught up.

The leader will send a BeginQuorumEpoch request to replica 4 so that it is aware of the voter change. Once this entry has been committed to the log, we can remove one of the existing replicas (say 3) and write a new message:

currentVoters: [1, 2, 4]
targetVoters: [4, 5, 6]

Continuing in this way, eventually eventually arrive at the following state:

currentVoters: [1, 4, 5, 6]
targetVoters: [4, 5, 6]

Suppose that replica 1 is the current leader. Before finalizing the reassignment, it will resign its leadership by sending EndQuorumEpoch to the other voters. Once the new leader takes over, it will complete the reassignment by removing replica 1 from the current voters. At this point, the quorum change has completed and we can clear the targetVoters field:

currentVoters: [4, 5, 6]
targetVoters: null

The previous leader will resume as a voter until it knows that it is no longer one of them. The leader will always choose to remove itself last as long as progress can still be made.

To summarize the protocol details:

  1. Quorum changes are executed through a series of single-member changes.
  2. The BeginQuorumEpoch API is used to notify new voters of their new status.
  3. The EndQuorumEpoch API is used by leaders when they need to remove themselves from the quorum.

Reassignment Algorithm

Upon receiving an AlterQuorum request, the leader will do the following:

  1. Append an VoterAssignmentMessage to the log with the current voters as CurrentVoters and the TargetVoters from the AlterQuorum request.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following scenarios happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

Note that there is one main subtlety with this process. When a follower receives the new quorum state, it immediately begins acting with the new state in mind. Specifically, if the follower becomes a candidate, it will expect votes from a majority of the new voters specified by the reassignment. However, it is possible that the VoterAssignmentMessage gets truncated from the follower's log because a newly elected leader did not have it in its log. In this case, the follower needs to be able to revert to the previous quorum state. To make this simple, voters will only persist quorum state changes in quorum-state after they have been committed. Upon initialization, any uncommitted state changes will be found by scanning forward from the LastOffset indicated in the quorum-state.

In a similar vein, if the VoterAssignmentMessage fails to be copied to all voters before a leader failure, then there could be temporary disagreement about voter membership. Each voter must act on the information they have in their own log when deciding whether to grant votes. It is possible in this case for a voter to receive a request from a non-voter (according to its own information). Voters must reject votes from non-voters, but that does not mean that the non-voter cannot ultimately win the election. Hence when a voter receives a VoteRequest from a non-voter, it must then become a candidate.

Observer Promotion

To ensure no downtime of the cluster switch, newly added nodes should already be acting as an up-to-date observer to avoid unnecessary harm to the cluster availability. There are two approaches to achieve this goal, either from leader side or from observer side:

  • Leader based approach: leader is responsible for tracking the observer progress by monitoring the new entry replication speed, and promote the observer when one replication round is less than election timeout, which suggests the gap is sufficiently small. This idea adds burden to leader logic complexity and overhead for leadership transfer, but is more centralized progress management.

  • Observer based approach: a self-nomination approach through observer initiates readIndex call to make sure it is sufficiently catching up with the leader. If we see consecutive rounds of readIndex success within election timeout, the observer will trigger a config change to add itself as a follower on next round. This is a de-centralized design which saves the dependency on elected leader to decide the role, thus easier to reason about.

To be effectively collecting information from one place and have the membership change logic centralized, leader based approach is more favorable. However, tracking the progress of an observer should only happen during reassignment, which is also the reason why we may see incomplete log status from DescribeQuorum API when the cluster is stable.

Reassignment Admin Tools

To facilitate the usability of the reassignment protocol, we would also add two admin requests:

KafkaAdminClient.java
public QuorumInfo describeQuorum(DescribeQuorumOptions options);

public AlterQuorumInfo alterQuorum(Set<Integer> newVoterIds, AlterQuorumOptions options);

The recommended usage is to first use the describeQuorum API to get the existing cluster status, learning whether there is any ongoing reassignment. The QuorumInfo struct is deducted from the DescribeQuorumResponse:

KafkaAdminClient.java
public clase QuorumInfo {
	
	Map<Integer, VoterDescription> currentVoters();

	Map<Integer, VoterDescription> targetVoters();

	Map<Integer, VoterDescription> observers();

	Optional<Integer> leaderId();

    int leaderEpoch();

	long highWatermark();
 
	public static class VoterDescription {
		Optional<Long> logEndOffset();
        Optional<Long> lastCaughtUpTimeMs();
	}
}

The epoch and offsets are set as optional because the leader may not actively maintain all the observers' status.

How does the reassignment tool work internally

When the admin calls the alterQuorum, underlying the thread will first send a FindQuorumRequest to find the stable leader. If that call times out or the group is in the election, the call would fail and inform user to retry. Once the leader is found, AdminClient will send AlterQuorumRequest to it. If the returned error is retriable like NOT_QUORUM_LEADER, the tool will perform a rediscovery of the quorum leader. For fatal errors such as authorization errors, the call would fail and inform user the result.

Tooling Support

We will add a new utility called kafka-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

--describe 

There will be two options available with --describe: 

  • --describe status: a short summary of the quorum status and the other provides detailed information about the status of replication.
  • --describe replication: provides detailed information about the status of replication

Here are a couple examples:

> bin/kafka-quorum.sh --describe
LeaderId:				0
LeaderEpoch: 			15
HighWatermark:			234130
MaxFollowerLag: 		34
MaxFollowerLagTimeMs:	15
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]

> bin/kafka-quorum.sh --describe replication
ReplicaId	LogEndOffset	Lag		LagTimeMs	Status		IsReassignTarget
0			234134			0		0			Leader		Yes
1			234130			4		10			Follower	No
2			234100			34		15			Follower	No
3			234124			10		12			Observer	Yes
4			234130			4		15			Observer	Yes

--alter

Initially, the only purpose of this API is to perform reassignments. In the future, there may be additional uses. Below is an example usage:

> bin/kafka-quorum.sh --alter --voters 0,3,4
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]


Client Interactions

Since this specific quorum implementation is only to be used by Kafka internally, we do not need to add new public protocols for clients. More specifically, the leader of the quorum would act as the controller of the cluster, and any client requests that requires updating the metadata (previously stored in ZK) would be interpreted as appending new record(s) to the quorum's internal log.

For some operations that require updating multiple metadata entries (i.e. previously as multiple ZK writes updating more than one ZK path), they would be interpreted as a batch-record appends.

The record append (either singular or batch) would not return until the leader acknowledged that high watermark has advanced past the appended records' offsets — they have been committed. For batch appends, we would only return when all records have been committed.

Note it is possible that a request could time out before the leader has successfully committed the records, and the client or the broker itself would retry, which would result in duplicated updates to the quorum. Since in Kafka's usage, all updates are overwrites which are idempotent (as the nature of configuration is a key-value mapping). Therefore, we do not need to implement serial number or request caching to achieve "exactly-once".

Metadata Versioning and Log Compaction

Each broker in KIP-500 will be an observer of the metadata log and will materialize the entries into a cache of some kind. We want to provide a stronger guarantee on the consistency of the metadata than is possible today. Let's refer to a materialized table of the log entries on some observer O up to a given offset N as Snapshot(O, N). The offset in this case can be considered as the version of the metadata. This version could in the future be used by the client to detect stale metadata so we want to be sure that it can be maintained consistently. In particular, if two observers and expose metadata up to N, then we want to guarantee that Snapshot(K, N) == Snapshot(J, N)

If all entries in the metadata log are indefinitely retained, then this is trivially achieved. However, in practice, we have to compact the log in order to keep it from growing too large. This proposal relies on Kafka's existing log cleaner for log compaction. Other approaches involve maintaining snapshots, but the benefits of using the log cleaner are 1) the logic already exists and is well-understood, and 2) it simplifies the fetch protocol since we do not need a different mechanism to fetch snapshots.

The diagram below presents a brief overview of how compaction works in Kafka today. The cleaner maintains an offset known as the "first dirty offset." On a given round of cleaning, the cleaner will scan forward starting from the dirty offset and build a table of the key-value pairs until either the end of the log is reached or the table grows too large. The end offset of this scanning becomes the next dirty offset once this round of cleaning completes. After building this table, the cleaner scans from the beginning of the log and builds a new log which consists of all the entries which are not present in the table. Once the new log is ready, it is atomically swapped with the current log.

In order to address the "consistent versioning" problem mentioned above, an observer needs to be able to tell when it has reached an offset such that the materialized snapshot at that offset is guaranteed to be consistent among all replicas of the log. The challenge is that an observer which is fetching the log from the leader does not know which portion of the log has already been cleaned. For example, using the diagram above, if we attempt to materialize the state after only reading up to offset 5, then our snapshot will not contain keys k3 and k4 even though they may have been present in the original log at offsets.

Our solution to address this problem is simple. We require the leader to indicate its current dirty offset in each FetchQuorumRecords response. An observer will know if its current snapshot represents a consistent version if and only if its local log end offset after appending the records from the response is greater than or equal to the dirty offset received from the leader.

This is outside the scope of this proposal, but we suggest that this approach to versioning may be useful for managing metadata consistency on the clients. Each metadata response can indicate the corresponding version of the metadata log so that clients have an easy way to detect stale metadata. 

Additionally, we provide a separate lastCaughtUpTimeMs field in the FetchQuorumRecords response which can be useful for an observer to detect how well it is keeping up with the replication. If lastCaughtUpTimeMs begins to grow, then an observer may decide to stop serve metadata requests from clients.

Quorum Performance

The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterQuorum) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

Rejected Alternatives

Use an existing Raft library: Log replication is at the core of Kafka and the project should own it. Dependence on a third-party system or library would defeat one of the central motivations for KIP-500. There would be no easy way to evolve a third-party component according to the specific needs of Kafka. For example, we may eventually use this protocol for partition-level replication, but it would make compatibility much more difficult if we cannot continue to control the log layer. So if we must control both the log layer and the RPC protocol, then the benefit of a third-party library is marginal and the cost is an unnecessary constraint on future evolution. Furthermore, Raft libraries typically bring in their own RPC mechanism, serialization formats, have their own monitoring, logging, etc. All of this requires additional configuration the user needs to understand.

Start with partition replication using Raft: As mentioned in several places in this doc, we are in favor of making Raft an available replication mechanism for individual partitions. That begs the question of whether we should just start there rather than creating a separate protocol for the metadata quorum? We were indeed tempted to do so, but ultimately decided not to because of the significant amount of orthogonal overhead it adds to the KIP-500 roadmap. As an example, we would need to reconsider how metadata is propagated to clients since the controller would no longer be responsible for elections. We could continue routing everything through the controller as is currently done today, but then that weakens one of the main motivations for Raft-based partition replication. Additionally, we would need a Raft protocol which could efficiently batch elections. These problems are tractable, but solving them takes us well out of the scope of KIP-500. With that said, we wanted to make this protocol to mirror the current replication protocol in some respects to make the eventual transition easier. That is one of the reasons we opted for a pull–based protocol as mentioned below. It is also the reason we decided to reuse existing log compaction semantics.

Push vs Pull: Raft is specified as a push protocol. The leader must track the state of of all replicas in order to reconcile differences and it is responsible for pushing the changes to them. Kafka's replication protocol on the other hand is pull-based. Replicas know which offset they need and fetch it from the leader. Reconciliation is driven by the replica. This makes it suited to large numbers of observer replicas since the leader only needs to track the status of replicas in the quorum. We have opted to stick with the pull-based model in this protocol for this reason and because it allows for easier reuse of the log layer. This also simplifies the transition to raft replication for topic partitions.

Support Atomic Arbitrary Quorum Change: In the Raft literature, another quorum change approach proposed was to support arbitrary number of node changes for the existing quorum in one shot. This requires a larger scope of the code logic change as we need to maintain two sets of node quorum inside the cluster.

The idea is that while during the migration from an old quorum to a new quorum, each appended message during this period has to be routed to all the existing nodes inside both old config and new config, as well as majority votes from two quorums. The benefit of this approach is that the quorum change becomes atomic across the group, but it comes with the cost of complexing normal operations and provides little practical values, since most operation in production does not require more than one server migration. The literature explicitly does not recommend this approach and the example implementation is indeed hard to reason about correctness.

Observer Based Promotion Observer based self promotion also has merits because we do pull based model already. It could just monitor its lagging from the leader, and when the gap is consistently below a certain threshold for a couple of rounds of fetch, we may call this observer ready to be joining the group. This special logic could be embedded inside the observer when handling BeginQuorumEpoch request sent from the leader. It will put the request in a delayed queue and only reply the leader for a success once the observer assumed itself as in-sync. However at the moment, we want to centralize the information about the quorum instead of letting it scatter around and increase the interaction complexity. In that sense, this approach may not be preferable.

  • No labels