Notes: 

  • Following notation is used: words written in italics and without spacing mean class names without a package name, for example, GridCacheMapEntry

Contents:

(Partition Map) Exchange

PME - is a process of exchange partition information between nodes. A process goal is to set up an actual state of partitions for all cluster nodes.

Triggers

Events which causes exchange

Topology events:

  • Node Join (EVT_NODE_JOINED) - new node discovered and joined topology (exchange is done after a node is included into the ring). This event doesn't trigger the PME if a thick client connects the cluster and an Ignite version is 2.8 or later.
  • Node Left (EVT_NODE_LEFT) - correct shutdown with call ignite.close. This event doesn't trigger the PME in Ignite 2.8 and later versions if a node belonging to an existing baseline topology leaves.
  • Node Failed (EVT_NODE_FAILED) - detected unresponsive node, probably crashed and is considered failed

Custom events:

  • Activation / Deactivation / Baseline topology set - ChangeGlobalStateMessage 
  • Dynamic cache start / Dynamic cache stop - DynamicCacheChangeBatch
  • Snapshot create / restore - SnapshotDiscoveryMessage
  • Global WAL enable / disable - WalStateAbstractMessage
  • Late affinity assignment - CacheAffinityChangeMessage


Exchange process

Partition Map Exchange process is the following:

  •  Non-coordinator nodes send its local state (GridDhtPartitionsSingleMessage)
  •  The coordinator merges local partition information. Coordinator builds partition full map 
  •  Coordinator sends a full map to other nodes (GridDhtPartitionsFullMessage)

Phase 1. Accumulation of partition state

Cluster members change causes major topology update (e.g. 5.0→6.0), cache creation or destroy causes minor version update (e.g. 6.0→6.1).

Initialization of exchange means adding future GridDhtPartitionsExchangeFuture to queue on each node.

Put to this queue (GridCachePartitionExchangeManager.ExchangeWorker#futQ) is done from the discovery thread.

'Exchange worker' thread manages this queue. Using only one exchange worker thread per node provides a strict processing order for futures.

First of all exhange worker thread runs init() on this exchange future.

There are important actions happened in this phase:

Exchange type determination

Basically there are 3 types of exchange - ALL, CLIENT and NONE.

We briefly describe CLIENT and NONE types here as they are much simpler and shorter than ALL type.

CLIENT exchange happened on client nodes. It doesn’t have any distributed phase. The client node just sends SingleMessage to the coordinator and receives FullMessage when the coordinator finished exchange.

NONE exchange happened on server and client nodes when Node join / left event is triggered by the client node. NONE exchange just updates the topology version. Client node which joins to cluster has CLIENT exchange type.

All other things following are applicable for ALL exchange type. 

Cache actions

Dynamic cache start / Activation case:

All caches needed to start (located at org.apache.ignite.internal.processors.cache.ExchangeActions#cacheStartRequests) start here (org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager#onCacheChangeRequest)

Dynamic cache stop case:

All caches needed to stop (located at org.apache.ignite.internal.processors.cache.ExchangeActions#cacheStopRequests) close public API to prevent new updates here (org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager#processCacheStopRequests)

Node join case:

If joining node has new configured caches (that did not exist before this exchange) such caches additionally started on all other nodes (org.apache.ignite.internal.processors.cache.GridCacheProcessor#startReceivedCaches)

On joining node all cluster and node caches start (org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture#initCachesOnLocalJoin)

Other cases:

No additional actions with caches required on this phase.

Affinity actions

For all started caches coordinator node calculates affinity for ‘initialVersion’, non-coordinator nodes also try to calculate affinity for ‘initialVersion’ (@see org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager#canCalculateAffinity), if it’s not possible nodes fetch affinity from coordinator.

After all necessary caches started and initial affinity initialized we set current exchange future as topology future (@see org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl#topReadyFut) to our Topology (@see org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture#updateTopologies).

The topology future is used for mapping atomic updates, transactions, and explicit locks. Each atomic operation or transaction should be associated with some topology version on what it should be performed or remapped to the next topology version. Each operation obtains topology future using topology read lock (e.g. @see org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture#mapOnTopology). If the future is already completed operation proceed using the topology version from this future. If the future is not completed operation asynchronously waits for its completion and remaps on this future version.

Before the distributed exchange start, we need to wait for finishing all ongoing updates that were mapped on the previous topology version. After the topology future is set we have operations that had time to obtain previous topology version (we called them as “ongoing”), other operations asynchronously wait for exchange future finish so we do nothing with them. They will proceed after the exchange future is finished.

Finishing ongoing updates have happened on the Partitions Release phase.

Partitions Release / Partitions Release Latch

Each update operation (atomic update, transaction) associated with some top-level future indicates the operation completion. We collect all of these futures to one CompoundFuture (or Multi-future) called “Partitions release future” and wait for its completion on all server nodes and then proceed to the next phase of exchange.

However, such simple waiting for completion maybe not enough.

There are 3 cases:

  1. Transaction future may not be top-level and waiting for its completion doesn’t indicate that the whole transaction is finished. It’s happened because top-level futures can be created on client nodes (GridNearTxLocal) which don’t perform partitions release.
  2. A top-level transaction future can be completed while the whole transaction is not finished yet. It can happen if PRIMARY_SYNC or FULL_ASYNC mode is set.
  3. The transaction future may never be completed. E.g. User can start PESSIMISTIC transactions, perform cache put (which causes lock obtaining and transaction topology version setting) and then do nothing.

For cases 1 and 2 a 2-phase waiting of transactions is introduced. In the first phase, we wait for finishing all transaction futures that we see locally on each of the server nodes. After waiting is finished all nodes waits for distributed countdown latch completion. After latch is released we have invariant that only transaction futures that represent commit from primary to backup can appear. So in the second phase, we do the same thing as in the first phase but without latch. [IGNITE-7871]

For case 3 a transaction timeout on PME is introduced. If the transaction can’t be completed within the specified timeout transaction will be forcibly rolled back during partitions release. [IGNITE-6827]

WAL History reservation

WAL History reservation is needed for historical rebalance. On this step each node lookups onto their current CheckpointHistory that is held in-memory and finds the earliest available checkpoint that is presented in WAL as CheckpointRecord for all cache groups. CheckpointRecord contains information about all groups and partitions - which update counter for each partition has been seen during this checkpoint creation. Earliest CheckpointRecord is reserved after successful found. Reservation means that WAL segment which contains this CheckpointRecord and more fresh segments will not be truncated (deleted) before segment release. After earliest CheckpointRecord is found and reserved, for all cache groups and partitions earliest available update is calculated. These update counters are used as starting points for historical rebalance. If joining node has the latest update counter for some partition greater than the earliest reserved counter on such node - it means that joining node can use this node as a supplier for historical rebalance and replay all updates since the latest update counter till WAL end.

Database callback

After partitions release phase is finished and we have no ongoing updates we can start with partitions manipulations. On this step, we need to restore partition states from page memory. Despite restoring partition states happen before node joins to cluster [IGNITE-9420] and it’s idempotent operation there is a case when this step is still needed. This is needed when Snapshot restore happens. During snapshot restore, we stop some caches and start, it again. After the cache start, we need to re-create partitions and read their state from disk (IgniteCacheDatabaseSharedManager#beforeExchange).

After existing partitions are attached to topology we need to create remaining partitions according to calculated affinity assignment on init phase and initialize partitions full map  (GridDhtPartitionTopology#beforeExchange).

Single message sending

If a node is not a coordinator, as a result, this method sends a single map ( GridDhtPartitionsSingleMessage ). A single map is sent using communication SPI (in peer2peer manner).

This single message contains partitions map for each cache ( GridDhtPartitionMap ). It contains owners of each partition for each cache group (cache group id is mapped to Partitions Map). Following is an example from the log for 2 caches:

msg=GridDhtPartitionsSingleMessage 
[parts=
{1544803905=GridDhtPartitionMap [moving=0, top=AffinityTopologyVersion [topVer=9, minorTopVer=3], updateSeq=2, size=406], 
-2100569601=GridDhtPartitionMap [moving=0, top=AffinityTopologyVersion [topVer=9, minorTopVer=3], updateSeq=161, size=100]}

Phase 2. Building and sharing full state

The coordinator maintains a list of nodes to wait for a response from (N1, N2, N3 for example at pic.1).

Single messages collecting

Each node sends the state of it’s local partitions to coordinator in message called “GridDhtPartitionsSingleMessage”. This message contains several maps representing the state of partitions of all registered caches. The state contains partition state, update counter and size of partition. This information is needed for validation, affinity calculation, assigning primary-backup and rebalance. Each exchange coordinator knows a set of server nodes that should send these single messages. When the coordinator receives all of them from all server nodes it starts to analyze the cluster state of partitions and perform affinity and partitions manipulations.

Exchanges merge

Exchanges merge is optimization when multiple topology events (node join / left) are happening at the same time. Instead of sequential execution of such exchanges they can be merged into one [IGNITE-6124]. How it works:

When all single messages of the current exchanges are received, the coordinator sequentially scans the exchange worker queue and finds all topology events that can be merged together. If the topology event requires a single message (node join) coordinator will wait for it. If next topology event in the queue is not supposed to be merged (e.g. it’s custom discovery event, or node joins with new caches) queue scan is stopped. All additional single messages from other joining nodes are placed here (GridDhtPartitionsExchangeFuture#mergedJoinExchMsgs), if other joining node hasn’t sent a single message yet, await counter is incremented (GridDhtPartitionsExchangeFuture#awaitMergedMsgs) when a single message is received this counter is decremented. If this counter becomes zero coordinator finishes exchange as in regular way but with another result topology version equal to the freshest version that it sees in merged exchange futures. Discovery events from such merged exchanges are placed to the current exchange future and they are used for further calculations. Merged exchanges are removed from the exchange queue on all nodes when the current exchange is finished.

Affinity calculation

On this step, we need to calculate ideal and “real” affinity partitions distribution on coordinator considering the state of partitions received from all other nodes. Ideal affinity partitions distribution is controlled by affinity function (Rendezvous by default). This function assigns primary and backup (configurable backup factor) nodes for each of partitions in all cache groups. Primary node always reflects the most actual state of data in each of the partitions. Primary node usually is a leader for any operations on partition and it replicates state to backup nodes.

Partition on a primary node should always have OWNING state. In the case of full and stable topology, ideal distribution is equal to real. In case of unstable topology, join-leave nodes, baseline change ideal affinity distribution not always following invariant that primary node has partition in OWNING state, because ideal affinity distribution doesn’t take into account the state of all partitions, it just shows desired state affinity. 

In this case, we need to place some backups that have OWNING state onto primary place before rebalance is finished. After rebalance such temporary distribution is changed to ideal and all primary nodes are now on the place, while extra backups become rent. This process is called as LateAffinityAssignment.

There are 2 main strategies to calculate “real” affinity:

  1. Partitions availability - this is a general affinity calculation strategy that takes into account the state of partitions. Here are 2 principles that this strategy uses:
  1. If primary partition by ideal assignment is OWNING use ideal affinity distribution for this partition
  2. If primary partition by ideal assignment is not in OWNING state take any partition owner and make them as temporary primary.

This strategy is used for almost all types of exchange - activation, baseline change and node left case.

  1. Node join with exchanges merge - taking into account the fact that invariant - “primary partition must be in OWNING state” should be respected from exchange to exchange, a joining node can just place itself as a backup for partitions assigned to it as primary by ideal assignment before rebalance is finished. The coordinator calculates this affinity assignment and includes it to FullMessage sending to joining node. This strategy has less time consumption and used as an optimization to general (partitions availability) strategy.

Validate partition states

On this step, we compare partition update counters and partition sizes between nodes that reported partition OWNING status. If there is a difference it may signal about data inconsistency problems in a cluster. The appropriate warning will be printed. 

Assign partition states

On this step, we need to find outdated OWNING partitions (having update counter less than others) move them into MOVING state and prepare to rebalance after exchange.

If partition can’t be rebalanced using WAL (@see WAL History reservation) asynchronous clearing will be started after changing such partition to MOVING state. 

Usually OWNING partitions that have outdated update counter are come from joining nodes that were offline some time.

This step is evaluated during cluster activation, baseline change or node join.

Detect lost partitions

On this step, we need to find partitions that don’t have at least 1 alive owner (at least 1 partition on some node in OWNING state). Such partitions are marked as LOST, reading-writing from such partitions are controlled by partition loss policy. If partition loss policy is IGNORE such partitions will be immediately moved to OWNING state without any restrictions regarding read-write operations on them.

This step is evaluated only when the node failed or left from cluster instead of previous (@see Assign partition states) step execution.

Full message preparing and sending

According to single maps received and to its own state coordinator build the full map.

When the state of partitions and affinity are determined we need to notify about this all other server nodes that participated in the exchange. Participated nodes set are formed 

To send all necessary information messages called “GridDhtPartitionsFullMessage” are used. It contains the following important information:

  1. Cluster-wide partition states from all nodes. It’s located in (GridDhtPartitionsFullMessage#parts).
  2. Actual update counters for all partitions. These update counters are set on partitions that have outdated state according to the cluster to make them virtually in-sync during rebalance. It’s located in (GridDhtPartitionsFullMessage#partCntrs2).
  3. Partitions that need to be cleared and fully rebalanced. It’s located in (GridDhtPartitionsFullMessage#partsToReload).
  4. Nodes that have sufficient WAL history so they can be suppliers for historical rebalance (GridDhtPartitionsFullMessage#partHistSuppliers).
  5. Affinity assignment for nodes that joining to cluster (GridDhtPartitionsFullMessage#joinedNodeAff).
  6. Affinity assignment that needs to be applied on other server nodes instead of calculating it locally (This is like affinity fetch) (GridDhtPartitionsFullMessage#idealAffDiff).

When all expected responses were received coordinator sends full map to other nodes.

Following is example of logging of 'Sending full partition map' for 2 caches:

msg=GridDhtPartitionsFullMessage [
 parts={
  1544803905=GridDhtPartitionFullMap [nodeId=1c6a3487-0ad2-4dc1-a69d-7cd84db00000, nodeOrder=1, updateSeq=6, size=5], 
  -2100569601=GridDhtPartitionFullMap [nodeId=1c6a3487-0ad2-4dc1-a69d-7cd84db00000, nodeOrder=1, updateSeq=102, size=5]}, 
 topVer=AffinityTopologyVersion [topVer=9, minorTopVer=3]

Partition state in full and single maps are encoded using bit sets for compression.

On receiving full map for current exchange, nodes calls onDone() for this future. Listeners for full map are called (order is not guaranteed).


Example of exchange and rebalancing for node join event

Preconditions

Example of possible partitions state for 3 nodes, 3 partition, 1 backup per partition

Node 1: Primary P1 - Owning
        Backup P2  - Owning
Node 2: Primary P2 - Owning
        Backup P3  - Owning
Node 3: Backup P1  - Owning
        Primary P3 - Owning

Each node knows following:

  • local partitions info - it is always up to date,
  • all cluster nodes partitions state - may have lag behind cluster state

Server node with lowest order (an oldest alive node in topology) is coordinator (there is always one such node) - Usually 'crd' in code

Let's suppose now new node is joining (it would be Node 4).

All other nodes know state of partitions that coordinator had observed some time ago.

Partition mapping is mapping of following: UUID -> partition ID -> State

PME and rebalancing 

Step 1. Discovery event triggers exchange

Topology version is incremented

Affinity generates new partition assignment


For example, Partition 3 Backup is now assigned to new node 4 (to be moved to N4)

Step 2. Node 4: Partition 3 Backup is created at new node at state Moving:

Node 2: Primary P2 -  Owning
        Backup P3 -  Owning
...
Node 4:
        Backup P3 -  Moving

Node 2 does not throw data and does not change partition state even it is not owning partition by affinity on new topology version.


Step 3. Node 4 starts to issue demand requests for cache data to any node which have partition data.

The full-map exchange most likely will be completed before node 4 rebalances all the data it will own (with node 4 partition 3 state - Moving),


Step 4. When all data is loaded (last key is mapped) Node 4 locally changes state to Owning (other nodes think it is Moving).

Node schedules a background cluster notification.


Step 5. Node 4 after some delay (timer) sends single message to Coordinator. This message will have absent/'null' version of exchange.


Step 6. Coordinator sends updated full map to other nodes.

Step 7. Node 2: observes that

  • in current topology there are 3 owners of partition, 
  • affinity says should be 2 
  • and current node is not affinity node for the partition. 

Partition can be safely removed.

Partition state is changed to Renting locally (other nodes think it is Owning).


Step 8.  Node 2 after some delay (timer) sends single message to crd.

Step 9.  Coordinator sends updated full map to other nodes.


All nodes eventually get renting state for P3 backup for Node 4.


SingleMessage with exchId=null is sent when a node updates local partitions' state and schedules a background cluster notification.

In contrast, when a partition map exchange happens, it is completed with exchId != null.


If more topology updates occurred, this causes more cluster nodes will be responsible for same partition at particular moment.

Clearing of cache partition data immediately is not possible because of SQL indexes. These indexes covers all node's partitions globally. Elements removed one by one. Later state will be Evicted.

Exchange types

  • synchronous: Come from discovery events (real & custom): have to wait transactions to finish
  • asynchronous, does not slowdown operations

Synchronous Exchange

DiscoveryEvent is sent to cluster using ring (DiscoverySpi).

Synchronous exchange is required for primary partition rebalancing to new node.

Let's suppose for a second, such partition map exchange is not synchronous: There is possible case that 2 primaries exists in Owning state. New node changed state locally, old node does not received update yet. In that case lock requests may be sent to different nodes, it could brake ACID.

As result, primary migration requires global synchronous exchange. It should not be running in the same time with transactions.

For protection there is Topology ReadWrite Lock

  • transactions & atomic cache updates - acquire read lock - N locks supported in the same time
  • exchange acquire write lock - 1 lock excluding transactions is possible

Using ReadWrite lock gives semantic N transactions, or 1 exchange.

Late Affinity Assignment

Late Affinity Assignment is optimization which assumes delayed switch of primary partitions after topology change.

This is feature is enabled by default since 2.0+ and is the only possible option since 2.1+.

On each topology change, for each started cache partition-to-node mapping is calculated using {@link AffinityFunction} configured for cache. When late affinity assignment mode is disabled then new affinity mapping is applied immediately.

In late affinity assignment mode if primary node was changed for some partition then current primary is not changed and new primary is temporarily assigned as backup. Later, when it's ensured that all new "ideal affinity" primaries are ready to become true primaries, cluster performs late affinity switch procedure: on separate PME (triggered by CacheAffinityChangeMessage) primary assignments are recalculated to match with {@link AffinityFunction}. There are three cases which may require late affinity assignment:

  • Node join. When node joins the cluster, neither of its partitions can be primary. See {@link CacheAffinitySharedManager#initAffinityOnNodeJoin}.
  • BLT change. On PME triggered by baseline change command, all primary partitions don't change their disposition. See {@link CacheAffinitySharedManager#onBaselineTopologyChanged}.
  • Cache start / cluster activation in persistent mode. In case persisted partition is outdated, its state is reset to MOVING. Primary is always chosen from OWNING partitions. Unlike node join case, if node is partially outdated and some of partitions are MOVING, its up-to-date partitions still can be primary. See {@link CacheAffinitySharedManager#initAffinityBasedOnPartitionsAvailability}.

Moment of the late affinity switch is controlled by the coordinator node. It stores information about all ongoing rebalancing processes in CacheAffinitySharedManager#waitInfo. When it's empty, coordinator node sends affinity change message (see {@link CacheAffinitySharedManager#checkRebalanceState}), which triggers late affinity switch PME. In case all partitions on joined node are up-to-date, such PME is triggered right away after topology change PME.


  • No labels