ID | IEP-131 |
Author | |
Sponsor | |
Created |
|
Status | DRAFT |
Allow the users to choose between HA and SC modes for partitions. The SC mode corresponds to the current behavior in Ignite 3, where strong data consistency is maintained, and any loss of majority leads to unavailability. In the HA mode, users will be able to read and write data from the surviving nodes even when the majority is lost, accepting the potential trade-off of possible data loss.
Ignite 3 uses RAFT as a consensus algorithm to achieve consistency and availability for data. Fundamental restriction for such algorithms is the following: 2 * F + 1 nodes in the replication group survive only F failed nodes. In our case that means that partition becomes unavailable if the majority of nodes from partition’s assignment are unavailable. With such an approach we ensure that partition is SC.
In contrast, Ignite 2 provides flexibility through TRANSACTIONAL caches with FULL_SYNC (client node will wait for write or commit to complete on all participating remote nodes) and PRIMARY_SYNC (client node will wait for write or commit to complete on primary node, but will not wait for backups to be updated) modes, allowing users to prioritize availability over strict consistency when necessary. This flexibility enables read and write operations even when majority nodes are lost, but with potential trade-offs in data consistency.
Example scenario illustrating HA mode functionality
Consider a cluster consisting of nodes: A, B, C, and D.
Scenario steps:
Key Considerations:
General requirements are the following:
CMG/MS majority preservation:
Stable topology assumption:
Possible node cleanup on rejoin and rebalance:
Acceptable unavailability duration:
We introduce new params in the zone’s description:
So, SC mode is the current behaviour of a zone with scale up/scale down timers, when assignments are recalculated only after dataNodesAutoAdjustScaleUp/dataNodesAutoAdjustScaleDown.
And in addition to these timers, in HA mode partitionDistributionResetTimeout controls how quickly the system adjusts to the node removals to maintain availability in HA mode.
It is reasonable to formulate some recommended values for all zones’ timers to users:
public class CatalogZoneDescriptor extends CatalogObjectDescriptor { /** * Specifies the consistency mode of the zone, determining how the system balances data consistency and availability. * * <p> * Possible values are: * <ul> * <li><b>STRONG_CONSISTENCY</b>: Ensures strong consistency by requiring a majority of nodes for operations. * Partitions become unavailable if the majority of assigned nodes are lost.</li> * <li><b>HIGH_AVAILABILITY</b>: Prioritizes availability over strict consistency, allowing partitions to remain * available for read-write operations even when the majority of assigned nodes are offline. The system * reconfigures the RAFT group to include only the available nodes without changing the assignments.</li> * </ul> * </p> */ private final ConsistencyMode consistencyMode; /** * The timeout after which the system checks partition assignments. If the majority of assigned nodes are lost, * it reconfigures the RAFT group to exclude offline nodes and restore availability, without changing the assignments. */ private final int partitionDistributionResetTimeout; }
Corresponding SQL syntax must be enhanced with new parameters as well
CREATE ZONE IF NOT EXISTS exampleZone WITH CONSISTENCY_MODE='HIGH_AVAILABILITY', PARTITION_DISTRIBUTION_RESET_TIMEOUT=1 ALTER ZONE IF EXISTS exampleZone SET PARTITION_DISTRIBUTION_RESET_TIMEOUT=5
If the user does not specify CONSISTENCY_MODE, by default it will be SC.
Let's describe the algorithm step by step, which will clarify the meaning and usage of this timer.
Every node working in the following
metastoreTopologyChangeEventReceived: onNodeLeft: initPartitionDistributionResetTimer(revision) initDataNodesAutoAdjustScaleDownTimer() onNodeJoined: initDataNodesAutoAdjustScaleUpTimer() onTopologyLeap: initPartitionDistributionResetTimer() initDataNodesAutoAdjustScaleUpTimer() initDataNodesAutoAdjustScaleDownTimer() onPartitionDistributionResetTimeout: revision = getRevisionFromEvent() for t in zoneTables: for p in tablePartitions: if !checkMajority(p, revision): resetPartition(p) nodeStarted: (stable, pending) = ( getCurrentStableAssignmentsFromMs(), getCurrentPendingAssignmentsFromMs()) // pending.force flag is a result of resetPartition, // see disaster recovery protocol if currentNode in union(stable, pending) && !pending.force: startRaftNode() else: if (pending.force): assert pending.size == 1 // see resetPartition requirements startRaftNode() else: skip // Definitions. // If you can't find any functions here, // it's supposed that their behaviour is trivial. initPartitionDistributionResetTimer(revision): // in the case of concurrent set, // new timer must reschedule old with new revision saveTimerIntent(topologyEventRevision, currentTime()) checkMajority(partition, revision): assignments = getCurrentAssignmentsFromStableMetastoreKey(partition) aliveAssignments = assignments.intersect(getLogicalTopology(revision)) return aliveAssignments.size() >= 1 + assignments.size() / 2 resetPartition(partition): /* It' will be the reworked method from the disaster recovery feature (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=315493878). Requirements: - Leader hijack protection must be already implemented. Analogue of IGNITE-22904 must be implemented for non-metastore partitions. - Reset must work in 2 phases: (planned=targetTopology, pending=singleAliveNode). Where targetTopology is the alive nodes from the current stable assignments. - Only alive nodes must be placed in the planned key. */ initDataNodesAutoAdjustScaleUpTimer() // see here https://cwiki.apache.org/confluence/display/IGNITE/IEP-101%3A+Distribution+Zones initDataNodesAutoAdjustScaleDownTimer() // see here https://cwiki.apache.org/confluence/display/IGNITE/IEP-101%3A+Distribution+Zones
Key outcomes:
Additional notes:
Let’s call the previously described algorithm as Phase 1 of the current feature.
In Phase 1, when the partition has recovered majority and nodes start after rolling restart, it is possible that some user data will be lost.
For example, when nodes are gone one by one, the last node could have not up-to-date data because this node could be lagging follower. As it is stated in phase one, the cluster will wait for that node and raft group to be recovered on such node and this will cause data loss.
In phase 2 we try to address this rolling restart issue, so the raft group will have the last up-to-date state after rolling restart.
Let's consider two scenarios.
A(10) means that node A has the last raft log id of the user's data equal to 10. We do not count raft meta like cfg changes, etc
Example 1.
When nodes [A, B, C, D, E, F, G] restarts, user would like to have raft group formed from [A, B, C, D, E, F, G], so data from 5 to 10 won't be lost, so we should not start on last stable (or pending) topology, but rather await for the original one.
Assert logicalTopology includes [A, B, C, D]
Example 2.
When nodes [A, B, C, D, E, F, G] restart, we would like to have a raft group formed from [E, F, G], so new data that was put on [E, F, G] won't be lost. On scheme below you can see how configuration was changed over time
General idea of the enhancement is the following:
On nodes restart, all nodes won’t start HA partition raft groups and will wait until the majority of nodes from the target_topology is alive, where target_topology is the last raft configuration where the user's data was changed, OR the last graceful raft configuration change. For Example 1 target_topology will be [A, B, C, D, E, F, G]**, for Example 2 it will be [E, F, G]
Actually, for Example 1, we need to wait for majority being alive not for [A, B, C, D, E, F, G], but [A,B,C,D], because raft log on [E,F,G] could be filled with some meta raft commands and its last log id could be greater than last log id of any nodes from [A,B,C,D]. Nodes [E, F, G] could be added later within the rebalance mechanism and their data could be cleaned up.
The most interesting part is how we calculate target_topology.
There are two questions for which we must find answers
Some ideas that could help find answers:
Any time when a state machine receives a configuration update command, we persist its log id (let's name it lastCfgIdx). Also any time we receive "User commands", we also persist this log id (let's name it lastUserCmdIdx).
On restart, we can wait for raft to apply committed commands to the state machine, so log ids will be up-to-date, but to not allow raft to start election, so it could form a raft group.
The last step is to compare lastUserCmdIdx and lastCfgIdx,
if lastUserCmdIdx > lastCfgIdx, then target_topology could be cfg on which lastUserCmdIdx was applied.
https://issues.apache.org/jira/browse/IGNITE-23870
https://issues.apache.org/jira/browse/IGNITE-24089
For the purpose of HA track phase 2 we are introducing abstraction AssignmentsChain
:
AssignmentsChain(List<AssignmentsLink> chain): AssignmentsLink lastLink(String nodeId) // returns last link where nodeId is presented AssignmentsLink replaceLast(Assignments newLast, long term, long index) // replace the last link in chain with the new one AssignmentsLink addLast(Assignments newLast, long term, long index) AssignmentsLink(Assignments assignments, long index, long term, AssignmentsLink next)
on every stablePartAssignmentsKey
update we also update the assignmentsChainKey
with the
currentAssignmentsChain = ms.getLocally(assignmentsChainKey) if (!pendingAssignments.force() && !pendingAssignments.fromReset()): newChain = AssignmentsChain.of(term, index, newStable) elif (!pendingAssignments.force() && pendingAssignments.fromReset()): newChain = currentAssignmentsChain.replaceLast(newStable, term, index) else: newChain = currentAssignmentsChain.addLast(newStable, term, index) ms.put(assignmentsChainKey, newChain)
On the start of every node in the HA scope the main points, which must be analysed:
- Should this node start in general
- Simple case: one link in chain, must be processed as usual case https://issues.apache.org/jira/browse/IGNITE-23885
- Case the pending or stable raft topology is alive (see the pseudo code above)
- Case with the chain unwind to understand the current situation (see pseudocode above)
- Should we cleanup this node or not
// method which collect and await alive request results from list of nodes Map<NodeId, Boolean> sendAliveRequests(List<String> nodes) // true if majority of nodes alive boolean majorityAlive(Map<NodeId, Boolean> responses) cleanupAndFollowUsualRecoveryFlow() // TODO: this check must be defined in future // https://issues.apache.org/jira/browse/IGNITE-23880 checkSelfAliveness() onAliveRequestReceive: sendResponse(checkSelfAliveness()) onNodeStart: (stabe, pending) = (ms.getLocally(stablePartAssignmentsKey), ms.getLocally(pendingPartAssignmentsKey)) request = new StateRequest() if (majorityAlive(sendAliveRequests(stable) || majorityAlive(sendAliveRequests(pending)): cleanupAndFollowUsualRecoveryFlow() return
Node aliveness is a separate non-trivial topic to discuss. High level vision: node is alive if it has an appropriate peer configuration
and functioning well.
We need to formalise this definition better.
So, if the pending or stable raft topologies is not alive, we need to start the process of chain unwind.
It means, that we need to find the link in the chain, which last (in the chronological meaning) has the user inputs.
bool checkIfChainHasUserInputs(): fullChain = ms.getLocally(assignmentsChainKey) currentNodeLink = fullChain.lastLink(currentNode) currentNodeCfgIndex = currentNodeLink.index nodesToSend = currentLink.next.nodes() - currentLink.next.next.nodes() hasUserInputsRequest = new HasUserInputsRequest(currentNodeCfgIndex) return anyMatchTrue(sendHasUserInputsRequest(nodesToSend, hasUserInputsRequest)) // request for the user inputs after the last RAFT group reconfiguration HasUserInputsRequest(cfgIndex) Map<NodeId, Boolean> sendHasUserInputsRequest(List<String> nodes, HasUserInputsRequest request)
Node response to the HasUserInputsRequest:
onHasUserInputsRequestReceived(request): if (hasUserInputs(cfgIndex)): sendResponse(true) else: sendResponse(checkIfChainHasUserInputs()) // https://issues.apache.org/jira/browse/IGNITE-23882 hasUserInputs(cfgIndex) startRaftOnFakeConfiguration() replayRaftLog() return lastUserCmdIdx > cfgIndex
And as the last step we need to handle the state responses:
if (checkIfChainHasUserInputs()): eraseLocalData() elif (hasUserInputs(cfgIndex)): startNode else: eraseLocalData()
Short version of algo, see needed method descriptions above:
onNodeStart: (stabe, pending) = (ms.getLocally(stablePartAssignmentsKey), ms.getLocally(pendingPartAssignmentsKey)) request = new StateRequest() if (majorityAlive(sendStateRequest(stable) || majorityAlive(sendStateRequest(pending)): cleanupAndFollowUsualRecoveryFlow() return if (checkIfChainHasUserInputs()): eraseLocalData() elif (hasUserInputs(cfgIndex)): startNode else: eraseLocalData() onAliveRequestReceive: sendResponse(checkSelfAliveness()) onHasUserInputsRequestReceived(request): if (hasUserInputs(cfgIndex)): sendResponse(true) else: sendResponse(checkIfChainHasUserInputs())
- In general, we can piggyback hasUserInputs flag on the first aliveness checks and use one request to check the aliveness
and to receive the user inputs.
- We can remove redundant requests, if will prepare the full list of target nodes in pending/stable/chain
and send only on request for each node.
- If we find the one node with user inputs - we can stop without waiting for another responses.
// Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.
// Links to discussions on the devlist, if applicable.
// Links to various reference documents, if applicable.
Phase 1 epic tickets
Phase 2 epic tickets