Motivation
Cassandra has 2 processes that detect and correct unreplicated writes: repair and read repair. Both processes rely on reading and comparing the actual data on disk. In addition to having to read a sometimes large amount of data, this approach creates different problems for each process.
In the case of repair, the current implementation streams the entire contents of one or more partitions between nodes if a mismatch is detected. This can unnecessarily create significant streaming and compaction load and this lack of granularity is one of the limiting factors on practical partition sizes.
In the case of read repair, since we are only reading and correcting the parts of a partition that we're reading and not the entire contents of a partition on each read, read repair can break our guarantee on partition level write atomicity. This approach also prevents meeting the monotonic read requirement for witness replicas, which has significantly limited its usefulness.
This CEP proposes fixing these shortcomings by tracking and reconciling individual mutations.
Goals
- Reduce replication lag with a continuous background reconciliation process
- Eliminate the disk load caused by repair merkle tree calculation
- Eliminate repair overstreaming
- Eliminate hints
- Reduce disk load of reads on cluster to close to 1/CL
- Fix single page mutation atomicity issues caused by read repair and short read protection
- Enable additional cluster topologies - ie: log only witness DCs
- Enable completion of witness replicas (addresses monotonic read issue)
Non-Goals
Replacing or deprecating existing replication or repair systems
Proposed Changes
On write, the coordinator generates a globally unique id for a client write and transmits the id along with the mutation to the replicas. The replicas will apply the mutation and keep track of the ids they have applied locally. On read, a single node will be queried for data and a summary of applied mutation ids, and instead of a data digest, the other replicas will just be queried for mutation id summaries. The coordinator will then compare the mutation id summaries received from each replica. If they match, the contents of the data request is returned, and if they don't, the missing mutations are queried and applied to the read response and the nodes that need them before responding the the client.
Instead of repairing data on disk, we reconcile the logs by comparing received mutation ids across all replicas for the range being reconciled and sending mutations to the replicas that are missing them. Log reconciliation will also establish a lower bound for mutation ids. The lower bound will enable gc of the mutation log and limit the number of mutation ids that need to be communicated during reads and later log reconciliations.
Below is more detail on changes for specific systems. Note that these changes are only active for tables using mutation tracking, and do not affect tables that aren't using them.
Schema
A keyspace/table property for replication type will be added to KeyspaceMetadata and TableMetadata. Available options for keyspaces are legacy
and logged
, with the default being legacy
. Selecting logged will activate mutation tracking for this keyspace. Available options for tables are keyspace
, legacy
, and logged
, with the default being keyspace
, which inherits the keyspace setting. The changes below are only applicable on tables where mutation tracking is enabled.
Addressable Commit Log
Accord introduced an addressable log - basically a commit log with an index that allows looking up individual entries by id. For tables using mutation tracking, mutations will be indexed on mutation id so we can quickly retrieve the mutations needed for reconciling mutations between replicas. The addressable log will also maintain an index mapping partition keys to mutation ids, for reconciliation.
Mutation Summarization
Enumerating and transmitting large sets of mutation ids as part of the read path and log reconciliation processes would start to get expensive if mutations couldn't be reconciled regularly (ie: in the case of an extended node outage). Luckily there's been a lot of research in distributed set reconciliation over the past decade, and we can use a combination of hashing and probabilistic data structures to maintain compact in-memory summaries of locally applied mutation ids that can be send in place of raw id lists. This will save us from going to disk to answer most non-data read requests, and minimize message payload sizes, even in the case of longer outages preventing gc of mutation id data.
Write path
On the write path, writes coordinated for tables using mutation tracking will generate a unique id comprised of the node's tcm id and a monotonically increasing value derived from a hybrid logical clock. This id will be written out to the replicas and stored with the commit log entry. Hints will no longer be written for failed writes.
As part of the log reconciliation process, described later, nodes will reject writes with a mutation id time lower than the time of any reconciliations that are in process or have been completed
Read path
On read, one replica will be queried for a data response and relevant mutation id summaries, the other replicas needed to reach consistency will only be queried for mutation id summaries. Basically the same logic as data/digest requests, except mutation id metadata in place of digests. Once the coordinator has received enough responses to satisfy the CL, the mutation ids are compared. If the mutation id summaries match, the data response is returned. If participants are missing mutations, the read coordinator performs the mutation id equivalent of read repair and coordinates the exchange of missing mutations between participants and applies them to the data response. Exchanging entire missing mutations here will fix the write atomicity problems caused by the current read repair implementation (which only repairs the data covered by the read command, and can tear mutations). Note that we’re not intending to actually exchange lists of mutation ids, and will instead use some combination of hashes and probabilistic data structures to detect differences in applied mutation ids between nodes.
Storage layer
Tombstone purging
In addition to the gc grace period, tombstones with a local deletion time greater than the most recent reconciliation time are not purged. This is analogous to the only purge repaired tombstones setting.
Log reconciliation
A background process will continuously poll replicas for mutation ids they've received and coordinate the delivery of missed ids between them. Once the reconciliation process has completed for some range of time,
- select a reconciliation time, backdated by the write timeout period + some wiggle room to prevent writes timing out
- instruct replicas to stop accepting new mutations < reconciliation time for the range being reconciled
- collect mutation ids received from each replica < reconciliation time
- coordinate the delivery of missing mutations between the nodes
- update replicas mutation id low bound with new reconciliation time
Cohort reconciliations
In situations where a replica is down for days, full reconciliation wouldn't be possible, since they require participation from all nodes to succeed. During this time, we'd be accumulating a lot of log data that would need to be stored, summarized, and ultimately reconciled when the partition is healed.
A worse case would be where you have an inter-DC network partition with local quorum queries continuing in their respective DCs. In that case, you'd accumulate a large amount of disjoint data that would need to be reconciled when the partition is healed. In this case you don't want to be dealing operating at the granularity of mutation ids.
In cases like these, the system would fall back to cohort reconciliations. This is similar to the full reconciliation process, but is only done with the subsets of nodes that are available. Unlike full reconciliations, data reconciled as part of a cohort reconciliation is kept separate, and mutation id low bound is not updated at the end of the reconciliation. This will keep available nodes as up to date as possible, and avoid needing to transmit mutation on the read path or to nodes other than those currently unavailable when the range is healthy again.
When dealing with extended partitions, we still perform partial reconciliations. So long as the replica set participating in these partial reconciliations remains constant, we can segregate the partially reconciled data under a unique id (a cohort reconciliation id) and discard the mutation id metadata that was reconciled. When the network partition is healed, nodes will stream in data assigned to cohort reconciliations that they did not participate in.
Note that the cohort id doesn't change every time there's a partial reconciliation, it changes every time the replica set participating in a partial reconciliation changes.
Repair
Log reconciliation will take the place of incremental repairs run from nodetool, full repairs run from nodetool will still compare and stream data on disk. Paxos and accord repairs aren't changed.
Bootstrap / Topology Changes
Since unreconciled mutation ids are stored in sstables, we don't need to stream logs in addition to sstables, or require any additional log operations before pending nodes are promoted to full nodes and begin participating in reads/reconciliations. The new nodes will not be able to transmit individual mutations during reconciliation, since they'll be coming out of sstables, and we'll probably want a flag that indicates they're not the preferred source for missing mutations they have on sstables, but the data will still be accessible if needed.
The joining / streaming process is as follows
- Determine the current mutation id low bound for the incoming range(s)
- Update topology to make node a write only replica for incoming ranges and begin accepting writes
- If possible, increment low bound by running a full reconciliation for the incoming ranges. This isn't mandatory, but will limit the amount of mutations this node may need to transmit from sstables in later reconciliations.
- Stream incoming ranges. If using strict consistency (ie: not a replacement and strict consistency hasn't been explicitly disabled) stream from the node giving up ownership of the incoming ranges, otherwise stream from the closest node.
- If we didn't stream with strict consistency, run a partial reconciliation with the nodes that are available.
- Node is promoted to full replica for the incoming ranges
Test Plan
In addition to the standard unit tests and dtests testing functionality for the individual patches, the paxos simulator will be expanded to simulate and validate normal cassandra reads and writes. This will be used to verify the correctness of replication via mutation logs in a wide variety of cluster configurations, schemas, etc. There will also be exhaustive testing via Harry, including fault injection, as well as muxing Harry validation runs with instance replacements.
What is the write atomicity issue?
Cassandra promises partition level write atomicity. This means that, although writes are eventually consistent, a given write will either be visible or not visible. You're not supposed to see a partially applied write. However, read repair and short read protection can both "tear" mutations. In the case of read repair, this is because the data resolver only evaluates the data included in the client read. So if your read only covers a portion of a write that didn't reach a quorum, only that portion will be repaired, breaking write atomicity.
Mutation tracking addresses this issue by reconciling at the mutation level. If different read replicas return different results, missing mutations are exchanged between them and applied, so there is no opportunity for a mutation to only be partially applied.
Short read protection has a similar issue as read repair. Since it does a follow up read after read repair in an attempt to reach the row limit, it can see mutations that have been applied since the first read. If the short read starts in the middle of the data written by the newer mutation, then the read will see a partially applied write, even if that write was successful.
Mutation tracking addresses the short read protection issue by forwarding newer mutations to the coordinator for inclusion in the client result.
Since (from the perspective of the storage layer) paged reads are implemented as separate read requests, this would not maintain atomicity across different page requests.
Enabling Witness Replicas
Here's a quick refresher on why witness replicas can't be used for most workloads, and how mutation tracking fixes it. The short answer is that it enables read monotonicity with partial datasets. Read monotonicity is an important property of quorum reads and writes. If a write fails, it may only be replicated to a less-than-quorum subset of replicas. If this happens, whether a quorum read will include it depends on which specific nodes are read from.
This can cause "flapping" data, where the contents of this partially applied write appear and disappear from quorum reads depending on which specific nodes are read from. Since this is difficult for applications to reason about, we have read repair. Read repair ensures that partially applied writes are replicated to at least a quorum of replicas before the read returns to the client. This ensures that reads started after the repairing read returns don't omit data that was returned in an earlier quorum read, and this provides read monotonicity.
In the case of witness replicas, the witnesses only contain unrepaired data, so they never match the full replicas. Although this is by design, the current implementation doesn't have a way to determine which data contained by the witnesses has reached quorum or not. So to guarantee read monotonicity we'd have to read repair nearly every read, which isn't acceptable. The other option is to forego read monotonicity, which is not acceptable for most applications.