IDIEP-131
Author
Sponsor

Alexander 

Kirill Gusakov 

Kirill Sizov

Mikhail Efremov

Created

 

Status

DRAFT


Motivation

Allow the users to choose between HA and SC modes for partitions. The SC mode corresponds to the current behavior in Ignite 3, where strong data consistency is maintained, and any loss of majority leads to unavailability. In the HA mode, users will be able to read and write data from the surviving nodes even when the majority is lost, accepting the potential trade-off of possible data loss.

Ignite 3 uses RAFT as a consensus algorithm to achieve consistency and availability for data. Fundamental restriction for such algorithms is the following: 2 * F + 1 nodes in the replication group survive only F failed nodes. In our case that means that partition becomes unavailable if the majority of nodes from partition’s assignment are unavailable. With such an approach we ensure that partition is SC.

In contrast, Ignite 2 provides flexibility through TRANSACTIONAL caches with FULL_SYNC (client node will wait for write or commit to complete on all participating remote nodes) and PRIMARY_SYNC (client node will wait for write or commit to complete on primary node, but will not wait for backups to be updated) modes, allowing users to prioritize availability over strict consistency when necessary. This flexibility enables read and write operations even when majority nodes are lost, but with potential trade-offs in data consistency.

Description

Use Cases

Example scenario illustrating HA mode functionality

Consider a cluster consisting of nodes: A, B, C, and D.

  • Node A hosts the Cluster Management Group (CMG) and Meta Storage (MS) and is considered out of scope of this example.
  • Nodes B, C, and D host a particular data partition.

Scenario steps:

  1. Loss of nodes C and D:
    • We aim to continue performing RW transactions on the surviving node B.
    • Even if the data on node B is not up-to-date, the system allows operations based on its current state.
    • This ensures that the cluster can continue to function with minimal disruption.
    • After a short, configurable period, new data can be written to node B while nodes C and D remain down.
  2. User adds data to node B.
  3. Recovery of nodes C and D:
    • Nodes C and D come back online.
    • User acknowledges that data on nodes C and D might be more up-to-date than the data on node B prior to their failure, because the majority of [B,C,D] can be achieved by the [C,D] choice on some data insertions.
    • Trade-off: The data from nodes C and D may be lost or overwritten, as the system reconciles the partition state based on node B's data.
    • After a dataNodesAutoAdjustScaleUp period, nodes C and D rejoin as data nodes of the partition with the full partition cleanup.
  4. Stopping all nodes:
    • All nodes in the cluster are stopped and the cluster is shut down.
  5. Restarting node A and node D:
    • Node A (with the CMG/MS) and node D are brought back online.
    • User wait until the majority of nodes [B, C, D] are back, or the user could perform manual partition disaster recovery.
  6. Reintroduction of nodes B and C:
    • Nodes B and C are brought back online.
    • They rejoin the partition's data nodes.

Key Considerations:

  • Data Consistency vs. Availability:
    • In HA mode, the user accepts that some data may be lost or overwritten when nodes rejoin the cluster.
    • The priority is to maintain system availability and allow RW transactions to continue, even if it means operating with potentially stale data.
  • No Immediate replica factor restoration:
    • The system does not attempt to immediately recover the original replication factor when nodes fail.
    • This avoids triggering parasitic data rebalances that can burden the network and impact performance.

Requirements

General requirements are the following: 

  • We want to have an ability to choose between a HA and SC modes for partitions/zones to handle situations when some nodes are unavailable and that leads to losing the majority of the RAFT group for the partition of a table. 
    • In HA mode, If the majority is lost, we still want to have the ability to read and write data from the surviving nodes. 
    • SC mode is the current behavior of the AI3, when losing majority leads to unavailability.
  • From a user’s perspective in HA mode the majority absence after failure must be handled seamlessly.
  • Behavior must be as close as possible to the TRANSACTIONAL, FULL_SYNC caches in Ignite 2.
  • By default, Ignite 3 is Strongly Consistent.
  • A rolling cluster restart in HA mode needs to be seamless from the user perspective, no latency spikes.

Assumptions and possible trade-offs 

CMG/MS majority preservation:

  • Initially, we can focus on data partition availability only, assuming there are 3 or more nodes in the CMG/MS so that the failure of one node does not lead to a majority loss. This allows us to handle partition availability first and later maybe generalize the approach to include CMG/MS availability.

Stable topology assumption:

  • We assume that the Ignite 3 cluster operates under a stable network. If there is a lost connection between two nodes, we expect that this connectivity issue will be resolved in a short period of time.

Possible node cleanup on rejoin and rebalance:

  • In HA mode, when a node rejoins a RAFT group, we introduce a possible node cleanup with further full rebalance, if the node has a data divergence with the RAFT group.

Acceptable unavailability duration:

  • While Ignite 2 theoretically does not experience unavailability during restarts, in practice, there is a failure detection reaction time and PME, which causes a pause in processing. We expect similar behavior in Ignite 3. Therefore, we need to consider what duration of unavailability is acceptable from a user perspective.

Public API Changes


We introduce new params in the zone’s description: 

  • Parameter consistencyMode that could change zone from SC mode to HA mode.
  • One new timeout is introduced: partitionDistributionResetTimeout.


So, SC mode is the current behaviour of a zone with scale up/scale down timers, when assignments are recalculated only after dataNodesAutoAdjustScaleUp/dataNodesAutoAdjustScaleDown

And in addition to these timers, in HA mode partitionDistributionResetTimeout controls how quickly the system adjusts to the node removals to maintain availability in HA mode. 

It is reasonable to formulate some recommended values for all zones’ timers to users:

  • Newly introduced partitionDistributionResetTimeout must be in a range of several seconds
  • dataNodesAutoAdjustScaleUp, which is responsible for data nodes adjustment on node joins, expected to be in a range of tens of minutes/several hours
  • dataNodesAutoAdjustScaleDown, which is responsible for data node reduction when nodes left, must be in a range of days, or even infinite.
public class CatalogZoneDescriptor extends CatalogObjectDescriptor {

     /**

     * Specifies the consistency mode of the zone, determining how the system balances data consistency and availability.

     *

     * <p>

     * Possible values are:

     * <ul>

     *   <li><b>STRONG_CONSISTENCY</b>: Ensures strong consistency by requiring a majority of nodes for operations.

     *       Partitions become unavailable if the majority of assigned nodes are lost.</li>

     *   <li><b>HIGH_AVAILABILITY</b>: Prioritizes availability over strict consistency, allowing partitions to remain

     *       available for read-write operations even when the majority of assigned nodes are offline. The system

     *       reconfigures the RAFT group to include only the available nodes without changing the assignments.</li>

     * </ul>

     * </p>

     */

    private final ConsistencyMode consistencyMode;

    /**

     * The timeout after which the system checks partition assignments. If the majority of assigned nodes are lost,

     * it reconfigures the RAFT group to exclude offline nodes and restore availability, without changing the assignments.

     */

    private final int partitionDistributionResetTimeout;

}


Corresponding SQL syntax must be enhanced with new parameters as well

CREATE ZONE IF NOT EXISTS exampleZone WITH CONSISTENCY_MODE='HIGH_AVAILABILITY', PARTITION_DISTRIBUTION_RESET_TIMEOUT=1


ALTER ZONE IF EXISTS exampleZone SET PARTITION_DISTRIBUTION_RESET_TIMEOUT=5


If the user does not specify CONSISTENCY_MODE, by default it will be SC.

Let's describe the algorithm step by step, which will clarify the meaning and usage of this timer.


Algorithm for phase 1: 

Every node working in the following


metastoreTopologyChangeEventReceived:
  onNodeLeft:
    initPartitionDistributionResetTimer(revision)
    initDataNodesAutoAdjustScaleDownTimer()

  onNodeJoined:
    initDataNodesAutoAdjustScaleUpTimer()

  onTopologyLeap:
    initPartitionDistributionResetTimer()
    initDataNodesAutoAdjustScaleUpTimer()
    initDataNodesAutoAdjustScaleDownTimer()

onPartitionDistributionResetTimeout:
  revision = getRevisionFromEvent()
  for t in zoneTables:
    for p in tablePartitions:
      if !checkMajority(p, revision):
        resetPartition(p)

nodeStarted:
  (stable, pending) = (
getCurrentStableAssignmentsFromMs(),
getCurrentPendingAssignmentsFromMs())
  // pending.force flag is a result of resetPartition,
  // see disaster recovery protocol
  if currentNode in union(stable, pending) && !pending.force:
    startRaftNode()
  else:
    if (pending.force):
      assert pending.size == 1 // see resetPartition requirements
      startRaftNode()
    else:
      skip


// Definitions.
// If you can't find any functions here,
// it's supposed that their behaviour is trivial.

initPartitionDistributionResetTimer(revision):
  // in the case of concurrent set,
  // new timer must reschedule old with new revision
  saveTimerIntent(topologyEventRevision, currentTime()) 

checkMajority(partition, revision):
  assignments = getCurrentAssignmentsFromStableMetastoreKey(partition)
  aliveAssignments = assignments.intersect(getLogicalTopology(revision))
  return aliveAssignments.size() >= 1 + assignments.size() / 2

resetPartition(partition):
  /*
  It' will be the reworked method from the disaster recovery feature (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=315493878).
  Requirements:
  - Leader hijack protection must be already implemented. Analogue of IGNITE-22904 must be implemented for non-metastore partitions.
  - Reset must work in 2 phases: (planned=targetTopology, pending=singleAliveNode). Where targetTopology is the alive nodes from the current stable assignments.
  - Only alive nodes must be placed in the planned key. 
  */

initDataNodesAutoAdjustScaleUpTimer() // see here https://cwiki.apache.org/confluence/display/IGNITE/IEP-101%3A+Distribution+Zones
initDataNodesAutoAdjustScaleDownTimer() // see here https://cwiki.apache.org/confluence/display/IGNITE/IEP-101%3A+Distribution+Zones


Key outcomes: 

  1. When majority is lost, partition becomes available after a short, configurable period
  2. After rolling restart, we need to wait until the majority of nodes from the last handled partition topology before restart will come back. Later we will see 


Additional notes:

  1. Change rebalance scheduling when data nodes are changed
    1. In HA mode for scale up situations: 
      1. Let’s say we have [A, B, C] for the partition assignments, B and C left.
      2. Raft group was narrowed in force manner to [A], and after that node B returned, we must enhance stable to [A, B]
      3. In terms of DZ.scale up, there wasn't any change in DZ.scale up time window, so data nodes will be the same, so it could mean that we don’t need to schedule new rebalance to enhance stable assignment to [A, B], but we actually do need. (Note that DZ.scale down timer is quite big and wasn’t event passed)
    2. Proposed enhancements
      1. Data nodes are rewritten on scale up even if they are the same
      2. When we decide if we need to trigger rebalance after data nodes change, we calculate assignments and apply nodes aliveness check filter to those assignments. If we see that the actual stablePartAssignmentsKey differs from the filtered one, we schedule rebalance. 
      3. In our example, calculated assignments will be [A, B, C], we will filter them to [A,B] and schedule new rebalance to enhance stablePartAssignmentsKey
  2. A special case, when all nodes from assignments of a partition  are unavailable, is out of scope. In this scenario partitions must be treated as lost and handled appropriately (not implemented yet).


Phase 2

Let’s call the previously described algorithm as Phase 1 of the current feature.

In Phase 1, when the partition has recovered majority and nodes start after rolling restart, it is possible that some user data will be lost.

For example, when nodes are gone one by one, the last node could have not up-to-date data because this node could be lagging follower. As it is stated in phase one, the cluster will wait for that node and raft group to be recovered on such node and this will cause data loss.

In phase 2 we try to address this rolling restart issue, so the raft group will have the last up-to-date state after rolling restart.

Let's consider two scenarios.

A(10) means that node A has the last raft log id of the user's data equal to 10. We do not count raft meta like cfg changes, etc 

Example 1

  1. p1.assignments.stable = [A(10)]
  2. [B, C, D, E, F, G] started
  3. p1.changePeers(A,B,C,D,E,F,G) // Graceful change peers
  4. p1.assignments.stable = [A(10), B(10), C(10), D(10), E(5), F(5), G(5)] 
    1. [E, F, G] are lagged
  5. [A, B, C, D] stopped
  6. p1.assignments.pending = [E, forced = true], p1.assignments.planned = [E, F, G]
  7. p1.assignments.stable = [E(5), F(5), G(5)]
  8. A restarted -> scale up -> changePeersAsync
  9. [E, F] stopped
  10. p1.assignments.pending = [G, forced = true]
  11. p1.assignments.stable = [G(5)]
  12. [G] stopped.

When nodes [A, B, C, D, E, F, G] restarts, user would like to have raft group formed from [A, B, C, D, E, F, G], so data from 5 to 10 won't be lost, so we should not start on last stable (or pending) topology, but rather await for the original one.

Assert logicalTopology includes [A, B, C, D]

Example 2.

  1. p1.assignments.stable = [A(10)] 
  2. [B, C, D, E, F, G] started
  3. p1.changePeers(A, B, C, D, E, F, G) // Graceful change peers
  4. p1.assignments.stable = [A(10), B(10), C(10), D(10), E(5), F(5), G(5)] 
    1. [E, F, G] are lagged
  5. [A, B, C, D] stopped
  6. p1.assignments.pending = [E, forced = true], p1.assignments.planned = [E, F, G]
  7. p1.assignments.stable = [E(5), F(5), G(5)]
  8. p1.insert() -> [E(7), F(7), G(5)]
  9. [E, F] stopped
  10. p1.assignments.pending = [G, forced = true]
  11. p1.assignments.stable = [G(5)]
  12. [G] stopped.

When nodes [A, B, C, D, E, F, G] restart, we would like to have a raft group formed from [E, F, G], so new data that was put on [E, F, G] won't be lost. On scheme below you can see how configuration was changed over time 


General idea of the enhancement is the following:

On nodes restart, all nodes won’t start HA partition raft groups and will wait until the majority of nodes from the target_topology is alive, where target_topology is the last raft configuration where the user's data was changed, OR the last graceful raft configuration change. For Example 1 target_topology will be [A, B, C, D, E, F, G]**, for Example 2 it will be [E, F, G]

Actually, for Example 1, we need to wait for majority being alive not for [A, B, C, D, E, F, G], but [A,B,C,D], because raft log on [E,F,G] could be filled with some meta raft commands and its last log id could be greater than last log id of any nodes from [A,B,C,D]. Nodes [E, F, G] could be added later within the rebalance mechanism and their data could be cleaned up. 


The most interesting part is how we calculate target_topology.

There are two questions for which we must find answers

  1. How to find the last graceful raft configuration change consistently on any node?
  2. How to find the last raft configuration where user's data was changed


Some ideas that could help find answers:

  1. To find the last graceful reconfiguration, we can iterate through stable assignments in the metastore. For the last element of such a chain, we can check actual raft configuration with the pending assignment. To simplify the idea, imagine traversing the history of changes from the bottom right to the top left.
  2. To find the last raft configuration where the user's data was changed, we can track the log id of the last user's command applied to the state machine. We have several types of commands that are applied to partitions raft:
    1. "In-raft commands", like configurations updates, etc
    2.  "Ignite meta commands", like SafeTimePropagatingCommand, BuildIndexCommand, PrimaryReplicaChangeCommand, CatalogCompactionCmd, etc
    3. "User commands" like Update/UpdateAllCommand, FinishTxCommand, etc.

Any time when a state machine receives a configuration update command, we persist its log id (let's name it lastCfgIdx). Also any time we receive "User commands", we also persist this log id (let's name it lastUserCmdIdx).

On restart, we can wait for raft to apply committed commands to the state machine, so log ids will be up-to-date, but to not allow raft to start election, so it could form a raft group.

The last step is to compare lastUserCmdIdx and lastCfgIdx,

if lastUserCmdIdx > lastCfgIdx, then target_topology could be cfg on which lastUserCmdIdx was applied.

Detailed algorithm on node restart:

Chain of assignments

https://issues.apache.org/jira/browse/IGNITE-23870

https://issues.apache.org/jira/browse/IGNITE-24089

For the purpose of HA track phase 2 we are introducing abstraction AssignmentsChain :

AssignmentsChain(List<AssignmentsLink> chain):
    AssignmentsLink lastLink(String nodeId) // returns last link where nodeId is presented
    
    AssignmentsLink replaceLast(Assignments newLast, long term, long index) // replace the last link in chain with the new one
     
    AssignmentsLink addLast(Assignments newLast, long term, long index)
AssignmentsLink(Assignments assignments, long index, long term, AssignmentsLink next)


on every stablePartAssignmentsKey  update we also update the assignmentsChainKey  with the


currentAssignmentsChain = ms.getLocally(assignmentsChainKey)
if (!pendingAssignments.force() && !pendingAssignments.fromReset()):
    newChain = AssignmentsChain.of(term, index, newStable)
elif (!pendingAssignments.force() && pendingAssignments.fromReset()):
    newChain = currentAssignmentsChain.replaceLast(newStable, term, index)
else:
    newChain = currentAssignmentsChain.addLast(newStable, term, index) 
ms.put(assignmentsChainKey, newChain) 


Start of the node


On the start of every node in the HA scope the main points, which must be analysed:
- Should this node start in general
    - Simple case: one link in chain, must be processed as usual case https://issues.apache.org/jira/browse/IGNITE-23885
    - Case the pending or stable raft topology is alive (see the pseudo code above)
    - Case with the chain unwind to understand the current situation (see pseudocode above)
- Should we cleanup this node or not

Pending or stable raft topology is alive


// method which collect and await alive request results from list of nodes
Map<NodeId, Boolean> sendAliveRequests(List<String> nodes)
// true if majority of nodes alive
boolean majorityAlive(Map<NodeId, Boolean> responses)
cleanupAndFollowUsualRecoveryFlow()
// TODO: this check must be defined in future
// https://issues.apache.org/jira/browse/IGNITE-23880
checkSelfAliveness()
onAliveRequestReceive:
    sendResponse(checkSelfAliveness())
onNodeStart:
    (stabe, pending) = (ms.getLocally(stablePartAssignmentsKey), ms.getLocally(pendingPartAssignmentsKey))
    request = new StateRequest()
    
    if (majorityAlive(sendAliveRequests(stable) || majorityAlive(sendAliveRequests(pending)):
       cleanupAndFollowUsualRecoveryFlow() 
       return 


Node aliveness is a separate non-trivial topic to discuss. High level vision: node is alive if it has an appropriate peer configuration
and functioning well.
We need to formalise this definition better.

Chain unwind

So, if the pending or stable raft topologies is not alive, we need to start the process of chain unwind.
It means, that we need to find the link in the chain, which last (in the chronological meaning) has the user inputs.


bool checkIfChainHasUserInputs():
    fullChain = ms.getLocally(assignmentsChainKey)
    currentNodeLink = fullChain.lastLink(currentNode)
    currentNodeCfgIndex = currentNodeLink.index
    
    nodesToSend = currentLink.next.nodes() - currentLink.next.next.nodes()
    hasUserInputsRequest = new HasUserInputsRequest(currentNodeCfgIndex)
    return anyMatchTrue(sendHasUserInputsRequest(nodesToSend, hasUserInputsRequest))
// request for the user inputs after the last RAFT group reconfiguration 
HasUserInputsRequest(cfgIndex) 
Map<NodeId, Boolean> sendHasUserInputsRequest(List<String> nodes, HasUserInputsRequest request)


Node response to the HasUserInputsRequest:

onHasUserInputsRequestReceived(request):
    if (hasUserInputs(cfgIndex)):
        sendResponse(true)
    else:
        sendResponse(checkIfChainHasUserInputs())
    
// https://issues.apache.org/jira/browse/IGNITE-23882
hasUserInputs(cfgIndex)
    startRaftOnFakeConfiguration()
    replayRaftLog()
    return lastUserCmdIdx > cfgIndex 


Handle state responses:


And as the last step we need to handle the state responses:

    if (checkIfChainHasUserInputs()):
        eraseLocalData()
    elif (hasUserInputs(cfgIndex)):
        startNode
    else:
        eraseLocalData()


Result algorithm


Short version of algo, see needed method descriptions above:


onNodeStart:
    (stabe, pending) = (ms.getLocally(stablePartAssignmentsKey), ms.getLocally(pendingPartAssignmentsKey))
    request = new StateRequest()
    
    if (majorityAlive(sendStateRequest(stable) || majorityAlive(sendStateRequest(pending)):
       cleanupAndFollowUsualRecoveryFlow() 
       return
       
    if (checkIfChainHasUserInputs()):
        eraseLocalData()
    elif (hasUserInputs(cfgIndex)):
        startNode
    else:
        eraseLocalData()
        
onAliveRequestReceive:
    sendResponse(checkSelfAliveness())
    
onHasUserInputsRequestReceived(request):
    if (hasUserInputs(cfgIndex)):
        sendResponse(true)
    else:
        sendResponse(checkIfChainHasUserInputs())


Optimisations

- In general, we can piggyback hasUserInputs flag on the first aliveness checks and use one request to check the aliveness
and to receive the user inputs.
- We can remove redundant requests, if will prepare the full list of target nodes in pending/stable/chain
and send only on request for each node.
- If we find the one node with user inputs - we can stop without waiting for another responses.

  

Risks and Assumptions

// Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.

Discussion Links

// Links to discussions on the devlist, if applicable.

Reference Links

// Links to various reference documents, if applicable.

Tickets

Phase 1 epic tickets  


Key Summary T Created Updated Assignee Reporter Status Resolution
Loading...
Refresh


Phase 2 epic tickets 

Key Summary T Created Updated Due Assignee Reporter P Status Resolution
Loading...
Refresh



  • No labels