Current state: Accepted

Discussion thread

JIRA: CASSANDRA-18330 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Cluster metadata in Cassandra comprises a number of disparate elements including, but not limited to, distributed schema, topology and token ownership. Following the general design principles of Cassandra, the mechanisms for coordinating updates to cluster state have favoured eventual consistency, with probabilisitic delivery via gossip being a prime example. Undoubtedly, this approach has benefits, not least in terms of resilience, particularly in highly fluid distributed environments. However, this is not the reality of most Cassandra deployments, where the total number of nodes is relatively small (i.e. in the low thousands) and the rate of change tends to be low.  

Historically, a significant proportion of issues affecting operators and users of Cassandra have been due, at least in part, to a lack of strongly consistent cluster metadata. In response to this, we propose a design which aims to provide linearizability of metadata changes whilst ensuring that the effects of those changes are made visible to all nodes in a strongly consistent manner. At its core, it is also pluggable, enabling Cassandra-derived projects to supply their own implementations if desired.

Probabilistic propagation of cluster state is suboptimal and not necessary, given the scale that Cassandra typically operates at. Not to mention it is (by definition) unreliable and, since there is no strict order for changes propagated through gossip, there can be no universal "reference" state at any given time. We aim to add a mechanism for establishing the order of state change events, and replace gossip as the primary mechanism of their dissemination in order to enable nodes which are participating in state-changing operations to detect when these intersect with other ongoing operations, such as reads and writes. The first two areas we plan to apply these changes to are Schema and Token Ownership. Establishing global order for the events requires us to add transactional semantics to changes in the state.

Token Ownership

Besides the fact that gossip propagation is slow and unreliable, using gossip for state changes leads to short-term replication factor violations. For example, a gossip-backed bootstrap process allows streaming to be started before there's a guarantee that any coordinator will make a write towards the bootstrapping replica; similarly, during bootstrap it is possible for the effects of the write operation, followed by the read to the same replica set, not to be visible. While a problem with streaming is a relatively minor reduction in durability guarantees, a problem with disagreeing read/write sets is a transient data loss, a consistency violation that likely goes unnoticed on a regular basis. Worse yet, there is no guaranteed upper bound for a difference in how much to coordinators may diverge from each other, which means that a coordinator may try to read from, or write to replicas the replicas that do not own the range at all.

To work around some of the shortcomings of gossip, Cassandra has to make use of artificial throttling, using RING_DELAY, and manually ensure there are no concurrent operations on overlapping ranges.

To fix these problems, Cassandra has to support modifying the owning replicas for a token range safely and consistently, and be able to do this even in the face of widespread failures. This entails deciding the owners for a token range, while handling the following three problems:

  1. Making ownership changes swiftly and reliably
  2. Tolerating some number of unavailable nodes and even datacenters with no reduction in consistency, durability or availability guarantees
  3. Ensuring this information is available to all operations that interact with the range across the change

Concurrent with such ownership changes, client-driven read/write operations are continually ongoing and may be coordinated by any node. Any solution to the problem of token ownership must ensure that all operations respond correctly (i.e. satisfying number of nodes of read and write consistency given replication factor and a consistency level) in the face of any ownership change that occurs whilst they are in flight.

In the immediate term, the planned changes described here will make range movements and replacements deterministic and safe. In the longer term, transactional cluster metadata will provide the foundational capabilities to be more flexible around replica placement, with the long term goal of automating much of the maintenance work involved in balancing clusters.

Schema Management

In Cassandra 4.x, schema changes are not ordered in any way. Rather, they are essentially built around the regular write path and utilise standard timestamped, LWW mechanisms for conflict resolution. This can lead to multiple issues when those changes race with each other.

Racing DROP X validations

Imagine the cluster has UDT named ud_type, and no tables currently using it. Then, two concurrent schema operations hit nodes A and B:

  1. A: CREATE TABLE foo (id int PRIMARY KEY, bar ud_type); Will succeed because the type is still present
  2. B: DROP TYPE ud_type; Will succeed because no table are known to use the type

Outcome: at the next restart, nodes won’t start up again, failing to load table foo due to the missing type ud_type Similar scenarios: drop of a UDF racing with creation of a UDA; racing validations between tables and views leading to broken and hard to recover states (e.g. DROP TABLE + CREATE VIEW)

Racing CREATE TABLE statements

This is a highly visible issue which we face frequently. CASSANDRA-5202 introduced time-based table UUIDs to prevent accidental reuse of sstables belonging to previously dropped tables with the same name. Unfortunately, that opened up a new issue - conflicting table IDs when racing to create on multiple nodes - or, most commonly, due to retries. Resolving this issue requires manual intervention (at minimum bouncing machines) and often unavoidable data loss, as one of the table has to discard its data.

There are other problems, that are also theoretically solveable by other means, but that are going to be adressed by introduction of transactional metadata, such as detecting divergent column sets during reads and writes, and avoiding corruption such as one on read-repair described in CASSANDRA-13004.


From the user perspective, implementing changes proposed in this CEP will:

  • completely eliminate transient and permanent data loss issues inherent to gossip propagation of membership changes
  • significantly improve Cassandra node bootstrap and startup time
  • significantly improve schema propagation time

From the operator perspective, implementing changes proposed in this CEP will:

  • make cluster shrinks, expansions, moving tokens, and replacing nodes quicker and more reliable
  • eliminate problems with gossip (partial propagation, divergence, delays)
  • make concurrent ring movements safe and painless

From the developer perspective, implementing changes proposed in this CEP will:

  • simplify code related to cluster membership (token metadata, gossip, bootstrap, replacement, decomission)
  • significantly improve test tooling around the distributed read/write path and cluster membership changes
  • open up opportunities for dynamic range movement and other cluster-wide features


  • Address non-deterministic propagation of cluster membership and range ownership:
    • Make cluster expansions and shrinks safe and amenable to automation
    • Support multiple concurrent additions, removals and replacements without compromising durability, correctness or availability
    • Eliminate problems, including potential data loss, caused by divergent views of the ring
  • Avoiding unavailability related to schema changes
    • Eliminate race conditions in schema modifications (e.g. multiple CFID problem)
    • Eliminate schema mismatches between coordinators and replicas (e.g. drop/add column etc)
  • Improve the efficiency of schema propagation
    • Decouple schema propagation from persistence (i.e. mutations against schema tables).
    • Eliminate "migration storms" as nodes schedule schema pulls and pushes independently
    • Ensure nodes have full and correct schema before bootstrapping


  • Introduction of external systems for the purposes of metadata management (zookeeper/etcd)
  • Specialisation of nodes in a cluster. Nodes may take on specific additional roles at times, but this is more like full/transient replica status than the namenode/datanode distinction in Hadoop, for instance.
  • Complete removal of gossip. We see utility, at least in the near term, in retaining gossip as both an input to failure detection and for the dissemination of transient and/or non-correctness impacting cluster metadata, such as RPC readiness, storage load, etc.

Out of Initial Scope

We envisage a transactional system of cluster metadata enabling many future enhancements which are out of scope for the initial development. These include:

  • The capability to stage complex sequences of events, present them to the operator/user and enable them to review and approve/reject them (e.g. an expansion plan to add x new instances to a cluster)
  • Ability to automatically perform re-sharding and range movements to rebalance based on CPU/QPS load
  • Deprecation of the token ring and ownership concepts
  • Managing other categories of persistent cluster metadata in the transactional system:
    • Credentials and Permissions
    • Repair History
    • Table truncation log
  • Safe migration to numeric table IDs rather than UUIDs (for lower-overhead marshaling)
  • Column identifiers independent of column name, enabling safe renaming and dropping of columns

Proposed Changes

In this document, when discussing data replication and read/write operations, we assume the use of replication settings and consistency levels that support basic levels of fault tolerance. Of course, nothing in this proposal precludes the use of either an RF or CL which provides "weaker" guarantees, or indeed access patterns like "write at ONE, read at ALL", but it also doesn't aim to address their inherent shortcomings.

The goal of this proposal is to replace gossip as the means of dissemination for cluster membership and data placement changes. The proposal specifies an alternative mechanism which provides strict linearizability of metadata changes whilst ensuring that the effects of those changes are made visible to all nodes in the cluster atomically.

  • Cluster Metadata Service
    • linearizes metadata updates from nodes
    • disseminates updates as they are committed
    • gates visibility of metadata changes
    • provides catch up / replay for lagging peers
  • Node Level Changes
    • metadata changes are applied strictly in order
    • data placement is static and calculated when changes applied
      • moves placement calculation off the hot path
      • includes pending range calculations
    • local metadata is immutable, no updates in place or requirements for locking


Data Range

Data for a specific keyspace and token range. A Cassandra instance may replicate a given data range for reads and writes independently as long as there's at least one node that is present in any possible majority one can collect of read and write replica sets.

Replica Group

The ownership group of a specific Data Range. Membership of a Replica Group includes all of what are currently known as "natural replicas". A given Data Range will have logically distinct RGs for reading and writing, though most of the time these will be identical.

Cluster Metadata Service

Maintains a log of events which modify cluster wide metadata. Members of the CMS are a subset of the nodes in the Cassandra cluster. This membership will be flexible and dynamic as it may grow to a certain configured or hard-coded size as the cluster grows, with membership updated when any of the previous members are decommissioned or unavailable for a prolonged period (see CMS Reconfiguration Protocol). Clients of the CMS submit metadata change events; the CMS determines whether each event should be executed or rejected based on the current state of the cluster. The CMS enforces a serial order for accepted events and proactively disseminates them to the rest of the cluster. The CMS also ensures that the effects of these events become visible in a logically consistent way, using the publication mechanism described in a following section (see Event Submission Protocol).


An epoch is a monotonically increasing counter associated with an event in the metadata change log. Each event committed to the log by the CMS implies a new epoch and as such, each epoch simply represents a specific point in the linearized history of cluster metadata. Both epochs and the change log itself are immutable and once an event is assigned a particular order in the log, this cannot be modified.

Each message any two nodes exhange contains the epoch. Comparing a node's own epoch with the epoch of a remote node serves as a mechanism to trigger the mechanism to catch up the node with the latest events.

Mapping Cluster Operations to Metadata Transitions (Events)

Cluster level operations, such as adding, removing or replacing nodes, may be decomposed into one or more steps which are executed sequentially to modify cluster metadata. Operations on disjoint replica groups may proceed concurrently as long as this does not violate invariants required by the various replication strategies employed.

Importantly, nodes in the replica group proceed through these sequences in unison. Each step is only triggered once a majority of the participating nodes have acknowledged the preceding step to the CMS.


Adding a new node to a cluster is decomposed into four logical operations.

  1. Split the ranges containing the new node's tokens
  2. Assign the new node as a write replica for those ranges
  3. Assign the new node as a read replica for those ranges. At the same time, any node which was previously a replica, but which is ceasing to be, is removed as a read replica
  4. Finally, the node giving up the range stops acting as a write replica (**)

(**) It may seem counterintuitive, that A is being written to even after we've stopped reading from it. This is done in order to guarantee that by the time we stop writing to the node giving up the range, there is no coordinator that may attempt reading from it without learning about at least the epoch where it is not a part of a read set. In other words, we have to keep writing while there's any chance there might be a reader.

Moving on to the next step always requires majority of participants (meaning the superset of nodes in pre-bootstrap, and post-bootstrap states) to learn about the epoch in which the previous change is enacted.

Between steps 2 and 3, the new node performs bootstrap, streaming required data from peers. Only when bootstrap is successfully completed can step 3 be performed.

For example, if we start with (a subset of) a ring where node A has token 100, B has token 200 and C has token 300 with and a single keyspace KS with RF=2 then both the read and write placements for KS will include:

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,B}
    (100, 200]    : {B,C}     (100, 200]    : {B,C}

If we then decide to add a new node X with token 150, step 1 is to split the existing ranges containing 150 without changing any ownership. At this point, both the read and write placements will contain:

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,B}
    (100, 150]    : {B,C}     (100, 150]    : {B,C}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

In step 2, the new node is added to the write groups for the ranges it is acquiring. After this step, only the write placement is modified with the read placement unchanged:

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,B,X}
    (100, 150]    : {B,C}     (100, 150]    : {B,C,X}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

Once X completes bootstrapping, step 3 can be executed, adding X to the corresponding read groups to replace nodes which are no longer going to serve reads those ranges. At this point the read and write placements will contain:

    Read Placements           Write Placements

    (0, 100]      : {A,X}     (0, 100]      : {A,B,X}
    (100, 150]    : {X,B}     (100, 150]    : {B,C,X}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

Finally, step 4 is performed, removing the write replicas that have been superceded by X.

    Read Placements           Write Placements
    (0, 100]      : {A,X}     (0, 100]      : {A,X}
    (100, 150]    : {X,B}     (100, 150]    : {X,B}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

Currently, Cassandra does not automatically clean up unowned ranges after bootstrap, so C will still hold data for (100, 150] on disk even though it is no longer a replica for that range. This is an implementation detail which could be revisited, but is not planned at this time.

An attentive reader will already have noticed that such multi-step process may get interrupted at any point. To avoid complex replay procedures and involved interplay with log truncation and snapshotting, cluster metadata simply holds pending states for any node to be executed. This way, if the node fails during any point of a multi-step process, all it needs to do is to get up-to-date with the latest version of the cluster metadata, and it’ll be able to safely continue it.


Reducing cluster size by decommissioning is conceptually just the inverse of expansion, again comprising four logical steps:

  1. Assign the nodes taking over the leaving node's ranges as write replicas
  2. Assign the nodes taking over the ranges as read replicas, whilst at the same time removing the leaving node from the read replica groups
  3. Unify the read and write placements, removing the leaving node from the write groups
  4. Merge any contiguous ranges which now have identical replica groups

Between steps 1 and 2, the leaving node performs unbootstrap, streaming its data to the peers which are taking over those ranges. Only after this is completed can the process move on to step 2.

As an illustration, to decommission the node X added in the bootstrap example, we start with the following identical read and write placements:

    Read Placements           Write Placements
    (0, 100]      : {A,X}     (0, 100]      : {A,X}
    (100, 150]    : {X,B}     (100, 150]    : {X,B}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

The first step in the process adds the nodes which will be taking over ownership of the ranges currently held by X to the write placements. The read placements are unchanged.

    Read Placements           Write Placements
    (0, 100]      : {A,X}     (0, 100]      : {A,X,B}
    (100, 150]    : {X,B}     (100, 150]    : {X,B,C}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

At this point X will unbootstrap, streaming its data to B and C as required. When this streaming is complete, step 2 of the process can be executed. This updates only the read placement, adding the new replicas and removing X.

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,X,B}
    (100, 150]    : {B,C}     (100, 150]    : {X,B,C}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

Once complete, the write placements can be updated, removing X.

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,B}
    (100, 150]    : {B,C}     (100, 150]    : {B,C}
    (150, 200]    : {B,C}     (150, 200]    : {B,C}

Finally, contiguous ranges with identical replica groups can be merged. This is essentially an optimisation and isn't strictly necessary, but it is useful for now in maintaining the historical view of placement as a token ring segmented according to each node's assigned tokens. After merging, the read and write placements contain:

    Read Placements           Write Placements
    (0, 100]      : {A,B}     (0, 100]      : {A,B}
    (100, 200]    : {B,C}     (100, 200]    : {B,C}

All operations which affect cluster membership and token ownership (remove/replace/assassinate node, move token, etc) can be similarly decomposed into multiple steps which can be safely executed and made visible.


Event Submission Protocol

If a node wishes to initiate a change to cluster metadata, it sends a commit request to a member of the CMS. The CMS first verifies that the proposed change would be valid if all previously accepted and committed changes have been applied.

For example, if the CMS has previously committed a change to decommission a node, a subsequent proposal to replace that node should be rejected as invalid. Likewise, if an event which performs DROP TABLE t schema modification has been committed, a subsequent event which performs an ALTER TABLE t command should be rejected.

The CMS can also decline to permit events which may violate consistency or durability invariants. For instance, a decommission event which would result in a state whereby required replication factors could not be satisfied may be rejected. More complex examples of operations which are unsafe for concurrent execution can easily be defined. Concurrent range movements, for instance, may only be permitted where there is no overlap between the affected ranges. The CMS accommodates this by keeping track of in flight operations and rejecting only those subsequent requests that would interfere with them.

If an event is both valid and permitted, the CMS then attempts to append it to the change log, assigning it an epoch and, ultimately, an immutable position in the change history. Appending to the log requires consensus between a quorum of CMS members and will initially be performed using Cassandra's existing Paxos implementation. Should a failure to gain consensus occur, perhaps due to a competing proposal being appended to the log first, the process is retried. If the change request is still valid according to state of cluster metadata after the updates accepted in the meantime have been applied, the CMS retries the log append. If those updates have rendered this proposal invalid, then it is rejected and an appropriate response returned to the submitter.

Following a successful submission, the new log entry, comprising the change event itself, along with its epoch and a client supplied identifier is asynchronously replicated to all known members of the cluster. This replication is not necessary to ensure correctness and is primarily a means of minimising the level of temporary divergence between nodes.

However, once committed to the log, the CMS must ensure that any material change to cluster metadata logically takes effect across the cluster in a way that preserves advertised replication factor and quorum consistency. How this is achieved is covered by the consistency guarantees.

Consistency Guarantees

A totally ordered log and sequential propagation of metadata change events are important components of a strongly consistent system. However, alone they are insufficient to provide a guarantee that every read or write operation which overlaps in time with a metadata change will see a consistent view of the metadata. To simplify reasoning about epochs, we are always operating under assumption that any epoch committed to the log can be used by any node, and that there is no mechanism to prevent or delay the visiblity of its effects in any way. But if any epoch can be visible to any node, we need to guarantee lower bounds for visibility of epochs in some way.

Take the example of topology changes; to ensure the correctness of an operation which modifies the membership of any Replica Group, the effects of each step of the operation must be visible to a majority of the current and proposed members of the RG before the next step is executed by any of the group. That is, if the makeup of a RG of a range acquired by X is changing from {A,B,C} (original set of replicas) to {B,C,X} (replicas after bootstrap is complete), each step must be acknowledged by at least 3 of {A,B,C,X} before the next step can be started.

We can illustrate using the bootstrap example above, where X is joining and taking over some ranges previously replicated by A and B. For simplicity, we will only consider the changes to the replica set for the range for which X is going to be a primary replica.

To recap, the 4 steps of the operation are:

  1. Epoch 100: Split the ranges containing X's token

  2. Epoch 101: Assign X as a write replica for the range it is gaining (read: A,B,C, write: A,B,C,X)

  3. Epoch 102: Assign X as a read replica for the range, whilst also removing A as read replica (read: B,C,X, write: A,B,C,X)

  4. Epoch 103: A stops acting as a write replica for the range it is giving up (read B,C,X, write: B,C,X)

    Given the asynchronous propagation of metadata changes, it would be entirely possible for successfully submitted change events to fail to reach a majority of affected replicas A,B,C,X, before other nodes start to receive them. This can lead to conditions which violate consistency, unless we guard progress between steps by waiting for a majority acknowledgement.

Without such a guard, it would be possible for two coordinators to learn about epochs 101 (Coordinator 1) and 103 (Coordinator 2) independently from each other, which could lead to the following sequence of events:

  • Coordinator 1 makes a read request, and attempts to collect a majority of read replicas A,B,C (read replicas for the range at epoch 101), and collects A,B
  • Coordinator 2 makes a write request, and attempts to collect a majority of write replicas B,C,X (write replicas for the range at epoch 103), and collects C,X

Both coordinators return success responses to their clients, but the write from Coordinator 2 has only propagated to C and X. This would mean that a write that Coordinator 2 has made will be (at very least temporarily) invisible.

Even worse scenario is imaginable if Coordinator 3 is still at epoch 100, and Coordinator 4 is already at epoch 102:

  • Coordinator 3 makes a write request, and attempts to collect a majority of write replicas A,B,C (write replicas for the range at epoch 100), and collects A,B
  • Coordinator 2 makes a read request, and attempts to collect a majority of read replicas B,C,X (read replicas for the range at epoch 102), and collects C,X

Just like in the first example, both coordinators return success responses to their clients, but the write from Coordinator 1 has only propagated to A and B. A is relinquishing the range in question, so imminently it will cease to be a valid replica. Coordinators of future requests will be able to obtain quorums containing only C and X, meaning if B becomes unavailable the write from Coordinator 1 may be permanently lost.

If we gate the progression from step to step for each range on acknowledgements by a majority of A,B,C,X the problem can be avoided. That is, until a majority of replicas have observed epoch 100, we do not attempt to submit 101; and until majority of replicas have observed 101 we do not attempt to submit 102, etc.

This way, any two majorities overlap by at least one node and makes the RG itself the source of truth regarding its composition, which ensures that any node with an outdated view of the RG can be informed of this at read or write time and can take appropriate action. How communicating nodes utilise this to ensure consistency, correctness and atomicity of metadata changes is detailed in read/write path integration.

Unfortunately, both problems described in this section are possible with the current gossip implementation.

Read/Write Path Integration

Exchanging the latest observed epoch during routine internode communication is the key to maintaining a consistent view of cluster metadata across all nodes.

During a write, the coordinator constructs a replica plan based on the cluster metadata that is known to it at the time. The mutation messages that the coordinator sends to those replicas include the epoch associated with the current metadata. This can be optimised to send the last epoch which had a material effect on the execution of the mutation, i.e. the most recent event to modify the Replica Group for the target Data Range, or the metadata for the target Keyspace and/or Table.

On receiving the mutation, each replica is able to compare that version with its own local metadata and detect any divergence. In the simple case where the replica is lagging behind the coordinator, it can catch up before processing the message (see catching up). In cases where the replica's version is ahead of the coordinator, it can determine if it is still a member of the RG and either respond with an error or continue to process the write. In either case, the replica's response to the coordinator will include the replica's metadata version.

As the coordinator receives responses from the replicas, it compares the metadata versions with its own, catching up if necessary. Having received enough responses to satisfy the consistency level, it verifies that the initial replica plan is still valid according to the up to date metadata and that it remains sufficient for the required consistency level. If not, the coordinator fails the request; currently this results in the client receiving an error response notifying that the required consistency was not achieved. It would be possible to improve this to enable the coordinator to recompute the replica plan and retry the operation.

In previous versions of Cassandra, with gossip based propagation, such a change in replica group would go completely unnoticed, resulting in the coordinator responding to a request with a success while it, in fact, has not achieved the promised consistency level. It is trivial to retry coordinator requests or to collect additional replicas to satisfy consistency and since this proposal focuses on the tooling for detecting any under-replication, we leave the actual fix out as a rather simple implementation detail.

Catching Up

Catching up can be done simply by requesting the sequence of log entries with a greater epoch than any previously seen by the node. As the log is immutable and totally ordered, this request can be made to any peer as the results must be consistent, if not exhaustive. Peers themselves may be lagging behind the true tail of the log, but this is perfectly acceptable, as it is impossible to propagate changes to all participants simultaneously, and we achieve correctness by means other than synchrony.

For instance, if during the course of performing a read or write, a coordinator learns from the replicas of a higher epoch than it has seen previously, it only needs to catch up to that specific epoch. In this case, requesting the replay from the replicas is sufficient. Of course, the CMS itself may be able to provide a more up to date view of the log, if necessary.

Combining what we learned in catching up, consistency guarantees, and read/write path integration, it is apparent that

for any range and any majority of nodes that currently replicate or have previously replicated it, a coordinator that is lagging behind by an arbitrary number of epochs, will not be able to collect a quorum for read or write that is inconsistent with a quorum obtained using metadata that is up to date with the latest epoch affecting replication of that range.

CMS Reconfiguration Protocol

A new (or newly upgraded) cluster is initialised with a single CMS node, which can either be started with a flag, or some other configuration option. This approach was preferred to avoid complex try-and-backoff initialisation protocols.

Nodes can be added and removed to the CMS manually, or by using a strategy that scales the CMS up and down to a certain size depending on the total cluster size. For example, 1 CMS node for a cluster size up to 2; 3 CMS nodes in a cluster of up to 5 nodes, 5 CMS nodes in a cluster of up to 100 nodes, and 10 CMS nodes thereafter. The CMS will detect changes in the cluster and will scale itself up and down automatically.

Reconfiguration itself occurs using the process that is analogous to "regular" bootstrap and also uses Paxos as a linearizability mechanism, except for there is no concept of "token" ownership in CMS; all CMS nodes own an entire range from MIN to MAX token. This means that during bootstrap, we do not have to split ranges, or have some nodes "lose" a part of the ring, and adding a node to the ownership group is as simple as:

  1. Add the node to the write replica set for distributed cluster metadata keyspace
  2. Add the node to the read replica set for distributed cluster metadata keyspace

After step 1 was acknowledged by the majority of CMS nodes, the entire event log is streamed towards the node joining the CMS. After step 2 is done, Paxos repair is executed to make sure that not only data, but also Paxos state is up-to-date.

If a CMS node is decommissioned, it first has to relinguish its ownership of cluster metadata, which may entail transfer of ownership to some other node (in other words, it may have to wait until another node becomes a full member of the CMS before it can stop participating in the CMS itself).

Since divergence over the state of CMS membership cannot grow larger than a single epoch (i.e. a single node can be either added to or removed from either read, write set, or both, in a rare event of CMS node assassination), any two read or write quorums will have overlap, so the reconfiguration protocol does not require additional safety conditions such as epoch visibility on Paxos commit, etc.

Snapshotting Metadata Log

It is intended that the log of metadata change events maintained by the CMS will be periodically snapshotted, with point-in-time representations of the cluster metadata being created. This will allow new nodes joining the cluster and nodes which are lagging far behind (perhaps through prolonged downtime or partition) to catch up more quickly by reducing the amount of replay required.

Snapshots will be triggered via an event in the metadata log, so non-CMS nodes can perform and persist them independently. This may provide useful redundancy for dealing with catastrophic failures of the CMS (see Failure Recovery).

On demand snapshotting will also be supported, along with log truncation and archival.

Failure Recovery

There are a number of failure scenarios that operators may need to deal with. These include:

  • Catastrophic failures of the CMS nodes; e.g. all, or a majority of, CMS nodes are temporarily or permanently unavailable.
  • Corrupt metadata state; e.g. a bug leading to unrecoverable/invalid metadata or log state.

The proposed mechanism for dealing with both of these failure types is to enable a manual operator override mode. This would allow operators to inject metadata changes (potentially overriding the complete metadata state) directly on any and all nodes in a cluster. At the most extreme end of the spectrum, this could allow an unrecoverably corrupt state to be rectified by composing a custom snapshot of cluster metadata and uploading it to all nodes in the cluster. For failures which do not involve corrupted state, such as the permanent failure of a majority of CMS nodes, the state could be minimally modified to force a reconfiguration of the CMS without going through the usual consensus process. Once the CMS is back in an operable state (which initially simply constitutes a single, available member), regular operation mode can be resumed to perform any further recovery.

This is clearly a very powerful tool and so obviously its use entails a high degree of risk. It should, therefore, be subject to the most stringent security constraints. The exact form of those constraints is left open to discussion.

Cancelling Committed Events

It is sometimes necessary for multi-step operations which involve changes to cluster state to be cancelled before all of their steps are executed. For instance, when adding a new node to the cluster, the new node may permanently fail before completing bootstrap and joining the ring. In the current gossip based system, after a period of inactivity (the FatClient timeout) the failed node is purged from cluster metadata by each node independently. Doing so triggers a recalculation of pending ranges, effectively reversing the changes made to replica groups prior to starting bootstrap. In the best case scenario then, a permanently failed bootstrap requires no operator intervention to restore the cluster to its previous state. However, reality often does not reflect the best case and the non-deterministic nature of the system can make failures difficult to reason about, bugs hard to identify and can lead to additional toil, increased unavailibity and, in the most serious failures, permanent data loss.

In the proposed system, the addition of the new node is written to the immutable log of metadata changes and so to remove it, a compensatory change must be made. From the perspective of the healthy nodes, the effect of the compensatory event is just the same as before; the metadata changes which added the bootstrapping node as a pending replica are reversed and the cluster metadata is returned to its prior state.

Because each node's view of liveness is both subjective and transient, it should never be a trigger for modifying cluster-wide state and its immutable history. This means that a mechanism such as the FatClient timeout can't be used to initiate the submission of a compensatory metadata change. If just a single node in the cluster becoming partitioned from a joining node could halt and revert the entire bootstrap, it would be a severe regression in terms of stability and operablility. For this reason, such metadata changes must be initiated by operators or external systems with a holistic view of the cluster state. This proposal includes adding new commands for this to nodetool, e.g. nodetool cancel bootstrap etc.

New or Changed Public Interfaces

  • The role of gossip will be reduced so that critical changes to node state are no longer triggered by gossip (though tools like nodetool gossipinfo should continue to function as before)
  • The shadow round of gossip will become redundant as current cluster state can be determined by catching up to the tail of the metadata event log. New nodes joining a cluster will direct a simple query to their seed nodes to learn the endpoints of the current CMS members, enabling them to request the log history and catch up. Restarting nodes will already have the CMS membership list persisted locally, but will be able to query any known peer to ensure this is still current.
  • Pending range and disk boundary recalculation will not rely on gossip and can instead rely on commit notifications
  • Internode message format will be modified to incorporate metadata version info
  • Internally, usage of TokenMetadata will be overhauled, replacing it with immutable datastructures. Changes will be computed statically as metadata changing events are received and made visible.
  • ReplicationStrategy will become side-effect free (i.e. serve as a pure function of token ownership to placement); ReplicaCache will become redundant as there’s always an immutable copy of placements precomputed available after event execution.
  • New tooling will be introduced for introspecting gossip state to help debug potential issues on non-CMS-enabled nodes in mixed-mode clusters.
  • Client tools will be added/extended to provide new functionality:
    • new nodetool commands to cancel operations which modify cluster metadata (see Cancelling Committed Events)
    • new virtual tables to view metrics and status of metadata log replication, CMS status & stats etc

Compatibility, Deprecation, and Migration Plan


The intention, at least in the immediate term, is to preserve the existing scope and representation of gossip state. This will ensure that tools like nodetool gossipinfo continue to be available to operators. The key difference will be that some application states will be set as side effects of applying metadata change events. Likewise, changes in gossip states will no longer drive local state changes on a node.

Likewise, this proposal does not include deprecating or otherwise moving away from the concept of a token ring, with each member of the cluster being designated as the primary replica for some portions of that ring, as denoted by the node's token assignment.

A key goal of this proposal is to minimise the scope of any operational changes. External interfaces will be unchanged and the processes for performing routine operational tasks, such as adding, removing and replacing nodes, should remain unchanged. The primary goal is to improve reliability and correctness without changing the processes themselves.

Migration Plan

This is a large and comprehensive feature, which touches many areas of the code base and modifies many critical subsystems. As such, it should be part of a major version change. However, a two phase migration plan is proposed, with certain pre-requisites included in a minor release of the preceding major version. Upgrading to this latest minor would be a mandatory requirement for a future safe major upgrade to a version including transactional Cluster Metadata.

One pre-feature that we would include in the preceding minor release is a node level switch to disable all operations that modify cluster metadata state. This would include schema changes as well as topology-altering events like move, decommission or (gossip-based) bootstrap and would be activated on all nodes for the duration of the major upgrade. If this switch were accessible via internode messaging, activating it for an upgrade could be automated. When an upgraded node starts up, it could send a request to disable metadata changes to any peer still running the old version. This would cost a few redundant messages, but simplify things operationally.

Although this approach would necessitate an additional minor version upgrade, this is not without precedent and we believe that the benefits outweigh the costs of additional operational overhead.

If this part of the proposal is accepted, we could also include further messaging protocol changes in the minor release, as these would largely constitute additional verbs which would be implemented with no-op verb handlers initially. This would simplify the major version code, as it would not need to gate the sending of asynchronous replication messages on the receiver's release version. During the migration, it may be useful to have a way to directly inject gossip messages into the cluster, in case the states of the yet-to-be upgraded nodes become inconsistent. This isn't intended, so such a tool may never be required, but we have seen that gossip propagation can be difficult to reason about at times.

As nodes are upgraded to the new major version, some will begin to assume roles as CMS members (per the reconfiguration protocol). Subsequently upgraded nodes will require some means to identify which of their peers have already been upgaded and to query them to learn the current membership of the CMS.

As detailed, during the major upgrade, operations which materially alter cluster state would ideally be disabled. It may be necessary to temporarily relax this constraint to address node failures in mixed mode clusters. However, once the upgrade is in flight, it should only be possible to add nodes to the cluster if they are running the new major version. If a non-upgraded node fails permanently, it can only be replaced with an upgraded one.

Test Plan

Code for this proposal will be tested using multiple comprehensive tools:

  1. Quorum Intersection Simulator: a simple tool that checks if all read and write placements in all epochs on all up nodes intersect. This tool gives an exhaustive result. In other words, it goes through all possible permutations of quorums, and is used as a part of the proof of the reconfiguration protocol correctness.
  2. Placement Simulator: simulates submission and execution of the events to CMS. Starting with a small cluster, we expand and shrink cluster with configured replication factor and allowed parallelism. On each step of the simulation, we check read and write placements and ownership for every range, making sure that it both conforms to invariants (i.e. replication factor, expected over-replication during streaming, guaranteed ownership of every token), but also compares production placement code with a simplified model.
  3. Coordinator/Replica Tests: simulates an arbitrarily-sized cluster from the perspective of a single node. In such test, a single in-JVM DTest node is started, and the rest of the cluster is simulated using response handlers. Such tests are particularly useful for testing read/write paths and for ensuring that specific responses from replicas do in fact trigger desired behaviours on the coordinator side. This test scenario can also be reversed for a node to serve as a replica, in which case we would use real response handlers and instead simulate coordinator-side requests.
  4. Test CMS implementation: allows delaying commit or propagation of any event to any of the participants. Can be used in in-JVM DTests or on test clusters to test scenarios when the nodes have a divergent view of the cluster.
  5. Fuzz Testing: can be executed against in-JVM and external clusters, with tracking-enabled Harry read/write workloads. Includes proposed extensions to the CQL client protocol V6 (not detailed here) to add debug information to certain response messages. When specified by the client, responses from the nodes would contain information about the execution results, including a replica plan for a given query. Harry then can validate if the quorum guarantees were in fact achieved for every query.

Methods 1-4 are lower-level convenience tests, which allow quickly and efficiently validating scenarios, either manually constructed, or based on exhaustive checks. Fuzz testing (5) is used to ensure we go beyond that and check more sophisticated, potentially infrequently arising scenarios and execute long-running integration tests on arbitrarily sized cluster.

While preparing for this CEP, we did experiment with a TLA+ spec proving correctness of the epoch visibility protocol, but decided to leave it out as maintaining a comprehensive proof was very time consuming and does not guarantee correctness of the final product. We believe that test tools listed above are simpler to maintain, easier to understand by the community, and will provide a much higher level of confidence.

We believe that no CEP prior to this one had a test suite as comprehensive. Due to the scope and criticality of this feature in Cassandra, in addition to the lack of existing test coverage for cluster metadata changes, we consider all these measures to be necessary.

Rejected Alternatives

We have familiarised ourselves with the existing publications on the subject (see the list below): papers on Paxos and Raft reconfiguration, and generally configuration services for different distributed systems. Even though some of these papers had interesting ideas, very few of them were directly applicable to our situation in Cassandra and none of them came close to a holistic picture of a configuration service that can be sufficient for Paxos, multi-partition transactions, schema changes, and eventually consistent operations. We have based our service on a simple assumption that guarantees consistency: make minimal changes to the membership, step-wise. The design of the service is then "pulled by the bootstraps" (i.e. reconfiguration of the reconfiguration service itself uses the same mechanism as the rest of the cluster).

Some obvious contenders, such as an external service to manage metadata (e.g. Zookeeper or etcd) were considered and rejected because implementing the persistence layer for the state is a small task compared to the amount of work required to actually design a reconfiguration protocol, make metadata immutable, and ensure desired consistency guarantees. The bulk of this CEP is focused around the required changes rather then the persistence layer itself, leaving it largely as an implementation detail.

Although the version of the CMS's event processing described in this document uses Paxos, this is entirely an implementation detail. In fact, to support some test cases, a simplified single node version backed by AtomicLong has been developed and nothing precludes further alternative implementations. However, we still strongly believe that using Paxos for configuration is the best way to proceed.

Even though a presence of “Transactional” in the title of CEP is prescriptive and suggests that there has to be a single order for the events and a single distributed authority for establishing it, we did consider non-transactional approaches, such as using CRDTs and causal consistency as well. With causal consistency, we would still need a way to establish a single operation order, even if we need just a partial order for a specific range and not a total order for all events accross the cluster. Similarly, CRDTs are useful as unbounded structures, while what we need here is strict validation ensuring that steps are coherent. The most simple, straightforward, and reliable way to achieve this was global event order.

First versions of this CEP draft had ownership groups synonymous with replica sets, and changes to neighboring replica sets requiring majorities from the involved sets to acknowledge the operation. However, we have quickly realised that the log itself (similar to persistence layer mentioned above) is the least of problems here: even in a large cluster, even with dynamic range movements, it will be a low throughput store, so there’s no need to partition it (even though it’d be not difficult to do this at some point in future, if necessary).

Before settling on what we’re presenting here, some subsystems/nuances also went through several cycles of improvements and simplication:

  1. A 2-step submit/commit process. To support resumable operations, events are submitted to the “pending” stage, from which a node could commit an event to the log. Admission to the pending state is allowed under condition that addition would not contradict to execution of the events already in the pending stage. This proposal was superceded with a simpler concept of range locking and persisting state for resumable bootstrap in the metadata state.
  2. Delayed epoch visibility. In order to guarantee required consistency, we can throttle visibility of the epoch, and make sure that events replicated to the nodes are not applied to the local metadata right away. Application of the epoch's event is allowed only after it was acknowledged by the majority of replicas involved in the operation. In such case, if the coordinator learns about the existence (but not acknowledgement by the majority of replicas) of the higher epoch either through replication or through the response from one of the replicas, it has to consult the cluster metadata service to check if the epoch should or should or should not be enacted. This proposal was superceded by the one which instead delays submission of the next event until the previous one for a given range is fully propagated.

Prior art


Future Work

Presently a Replica Group, within a given DC, is the sequence of nearest nodes to the token within that DC (and on different racks). This is inflexible and error prone. Calculating ownership changes, particularly when there are multiple in-flight, and some may fail or be cancelled, is challenging and has long been a source of bugs. A Replica Group should be defined for an explicit Data Range. It should include the set of nodes that replicate the range for reads and writes independently, and should explicitly list the nodes participating in each logical replica; for writes, this may include multiple nodes for a single logical replica, and during a transition, all of the nodes must respond for the vote to be counted as success.

 Range = [Start, End],
 Read  = [owner1,owner2,owner3],
 Write = [[owner1a,owner1b..],[owner2],[owner3]]

As shown in the preceding sections, ranges that have the same Replica Group can be split into two or more ranges, and then the new separated ranges can be re-assigned to new Replica Groups as necessary, using the above approach. The same can happen in reverse, with ranges we want to merge being moved to the same Replica Group, then merged into a single record.

This scheme obviates the need to assign a node a token. We can simply assign nodes participation to a Data Range (i.e. a token range for a specific keyspace). It is unnecessary for a node to replicate the same ranges for every keyspace, or even to participate in replication for every keyspace. While in the initial implementation, we choose to simulate the ring, and by default to replicate the same ranges for every keyspace, this potential future enhancement affords us greater flexibility than we have today, particularly with multi-tenancy. Finally, while we may continue to refer to membership of "the ring" we can also stop modelling it as such: the token space has a well defined start and end, and by treating it as such we can avoid many wrap-around bugs that have occurred over the years.

This also means that nodes will be able to join the cluster for the purpose of serving as replicas for specific ranges, and range ownership can be completely dynamic: load, data-size, and node configuration dependent. Arbitrary ranges can be moved to arbitrary nodes, on keyspace/replication strategy level. In other words, it will be possible to have a cluster where only some of the nodes replicate one keyspace, and other nodes replicate the other ones.

There are many other operations that will benefit from having a consistent log. For example, authentication and role management, or scheduling clusterwide repairs.

  • No labels