Audience: All Cassandra Users and Developers
User Impact: Support for fast general purpose transactions
Whitepaper: Accord
GitHub: https://github.com/apache/cassandra-accord
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/xgj4sym0d3vox3dzg8xc8dnx4c8jb4d5 , https://lists.apache.org/thread/j402lzzf7m699zc2vk23vgfxz8wwtlyl
JIRA: - CASSANDRA-17092Getting issue details... STATUS
Motivation
Users must expend significant effort to modify their database consistently while maintaining scalability. Even simple transactions involving more than one partition may become complex and error prone, as a distributed state machine must be built atop the database. Conversely, packing all of the state into one partition is not scalable.
Performance also remains an issue, despite recent Paxos improvements: latency is still twice its theoretical minimum over the wide area network, and suffers particularly badly under contention.
This work aims to improve Cassandra to support fast general purpose transactions. That is, those that may operate over any set of keys in the database atomically, modifying their contents at-once, with any action conditional on the existing contents of any key.
Goals
- General purpose transactions (may operate over any keys in the database at once)
- Strict-serializable isolation
- Optimal latency: one wide area round-trip for all transactions under normal conditions
- Optimal failure tolerance: latency and performance should be unaffected by any minority of replica failures
- Scalability: there should be no bottleneck introduced
- Should have no intrinsic limit to transaction complexity
- Must work on commodity hardware
- Must support live migration from Paxos
User Impact
Batches (including unconditional batches) on transactional tables will receive ACID properties, and grammatically correct conditional batch operations that would be rejected for operating over multiple CQL partitions will now be supported, e.g.
BEGIN BATCH UPDATE tbl1 SET value1 = newValue1 WHERE partitionKey = k1 UPDATE tbl2 SET value2 = newValue2 WHERE partitionKey = k2 AND conditionValue = someCondition APPLY BATCH
Long Term
This work is expected to replace Paxos for the project. In order to support live migration both protocols will be maintained for at least an interim period, after which it is expected that Paxos will be deprecated. The precise timing of this will be up to the community.
While not an initial goal, supporting transactional global secondary indexes and supporting non-global modes of operation (i.e. to replace LOCAL_SERIAL for latency sensitive users) are future goals we anticipate enabling with this work.
This work shall be developed in a modular manner, to allow for coexistence with other consensus protocols or transaction managers. This will allow us to evolve Accord without precluding alternative solutions, as future work expands Cassandra's transactional capabilities beyond the goals of this CEP. Initially, supporting the Paxos-based LWT and Accord side by side is also an example of such modularity and optionality.
Non-Goals
- UX improvements to exploit this facility are considered out of scope, and will be addressed in separate CEP
- SASI, SAI and local secondary indexes
Prior Work
Existing databases solve this problem by using either a global leader (FaunaDB, FoundationDB) or by a complex combination of multiple leaders including a transaction log and per-key leaders (DynamoDB, CockroachDB, YugaByte). The academic literature has also outlined leaderless approaches that have not yet been utilised.
Global Leader
This category of approach is simple and correct but introduces a scalability bottleneck that would be irreconcilable with the size of many Cassandra clusters.
Multiple Leaders
DynamoDB, CockroachDB and YugaByte utilise variants of this approach. It appears to be a very complex setup. The latency that can be achieved is unclear, but is unlikely to be better than two round-trips in the general case, since leaders are not guaranteed to be co-located (and nor are clients with leaders). Importantly these approaches appear to require either specialised hardware clocks or provide only serializable isolation, and may well have throughput restrictions to provide any expectation of strict-serializable isolation even without a strong guarantee of maintaining it. They also suffer from aborts. They do offer optimal failure tolerance and are scalable.
Janus
Strict-serializable isolation may be easily extended to cross-shard transactions using EPaxos-derived approaches, as was first demonstrated by Janus. In its original formulation Janus is fundamentally a more limited variant of EPaxos that was expanded to the cross-shard case. It has terrible failure properties, requiring every replica to agree to take the fast path. Fortunately, all leaderless protocols based on the intuitions of EPaxos can likely be modified to support cross-shard transactions, so we may assess all such protocols.
EPaxos
While Janus has poor failure properties, the EPaxos protocol can be extended in a similar fashion while achieving almost the same failure properties - we must only increase the quorum size by one, or else utilise a proxy coordinator for each shard. EPaxos is the first leaderless protocol to demonstrate a fast path, however it suffers from high contention as every fast path participant must have witnessed every dependency in order for the fast path to be agreed.
Strict-serializable isolation is not a property of the basic EPaxos protocol, however it is possible to support it by responding to clients only once a transaction’s dependencies are committed. This is an acceptable but unnecessary limitation, as we will later demonstrate.
Latency of EPaxos is good if there are no conflicts, but like other leaderless protocols its failure properties are poor: the fast path will be disabled once a quarter of replicas are unreachable, so that the system will incur significantly higher load and slower transactions at this point, creating a risk of cascading failure.
Caesar
This protocol builds on the intuitions of EPaxos but instead of unanimously agreeing dependencies, it instead unanimously agrees an execution timestamp. This confers a lower conflict rate, as each voting replica may witness a different history - we only require that some super-majority sees the transaction’s timestamp before any higher timestamp. The use of a timestamp confers additional benefits we will outline below.
Caesar builds a set of dependent transactions that are equivalent to those of EPaxos in the aftermath of agreeing an execution timestamp, where the dependencies are consistent with the timestamp execution order but permit commutative operations (mostly reads with respect to other reads) to not interfere with each other, so that they may be performed concurrently.
However, Caesar has downsides: transactions may livelock indefinitely (though this is implausible, in practice this may lead to latency spikes), and its worst case latency is three wide area round-trips. It has the same suboptimal failure properties as other leaderless protocols.
Tempo
Tempo improves upon Caesar for latency, guaranteeing two-round agreement without caveat. Unfortunately it suffers from a read scalability bottleneck due to its inability to exploit transaction commutativity, and has particularly poor failure properties.
Proposed Approach
These existing leaderless approaches achieve low latency for uncontended keys so long as a super-majority of replicas remain healthy, however none have optimal failure characteristics, latency still suffers under contention and as few as a quarter of failed replicas forces the system to slow-path consensus, harming latency and increasing cluster load.
We propose a new leaderless timestamp protocol Accord, with better properties than Caesar or Tempo and for which we have derived formal proofs of correctness (to be published at a later date). We also propose two new techniques: a Reorder Buffer that guarantees one round-trip consensus with a timestamp protocol, and Fast Path Electorates which permit a leaderless fast-path to remain accessible under worst-tolerated failures.
We propose incubating Accord within the project to be developed as a standalone library. It is designed for integration into Cassandra with the sole purpose of supporting these goals. A prototype has been developed [link] that has demonstrated its correctness against Jepsen.io’s Maelstrom tool and a similar in-tree facility, but remains incomplete and not ready for production use. We propose incorporating it into the project under a new repository to be developed alongside necessary integrations and improvements to Cassandra.
Reorder Buffer
This technique is very simple: the cluster determines (through some secondary mechanisms) the maximum clock skew between any two nodes in a replica set, and the point-to-point latencies. Timestamp proposals are then buffered at replicas for a period equal to this clock skew and the longest point-to-point latency (minus the latency between the transaction’s coordinator and the receiving replica). This means messages are only processed once any other coordinator’s message that might conflict must have arrived (unless the coordinator process was unhealthy and stalled, for instance). Messages are then processed in timestamp order, guaranteeing that they can be acknowledged for fast path participation. By performing this on every replica, a coordinator is guaranteed to obtain a fast path quorum.
This approach can be modified further to await only those messages from coordinators that require our participation to reach fast path agreement (i.e. in heterogenous setups where some coordinator is farther away but is able to reach quorum nearer to itself, it might be possible to wait only on those messages from nearer coordinators that likely depend on our participation for their fast path)
Fast Path Electorate
We define an electorate to be those replicas that may vote on a decision. Ordinarily this is the full replica set, but for fast path decisions it is possible to exclude some replicas, so that the electorate is a subset of the full replica set. In this case fewer votes are needed to reach fast-path consensus. Specifically, for every two nodes removed, one fewer vote is needed. This is based on the observation that fast path quorums are sized to simultaneously intersect any other fast path quorum and any recovery quorum (typically a simple majority). This is where the 3/4 of replicas vote size is derived from (EPaxos reduces this by one by requiring that the coordinator is a replica, however it does so in a manner that makes cross-shard transactions exceptionally complex and a reorder buffer unworkable, so we discard this particular optimisation as this new approach makes it redundant).
To demonstrate that this approach may always reach consensus under maximal failure, we can first observe that in this eventuality only a single simple quorum is reachable. We may configure the fast path electorate to contain only the members of this one simple quorum, so that all fast and slow path operations must use this same quorum. By definition, this one quorum must overlap with any other simple quorum we might use for recovery, and also by definition overlaps with every other fast path quorum (as there is only one possible fast path quorum in this configuration). Since all of the other nodes are offline, we lose nothing by restricting our fast path agreement to this single quorum, but gain optimal failure properties by being able to do so.
Accord
In order to exploit the above innovations, we must select a timestamp protocol. To ensure reads are scalable we must also derive dependencies, so Caesar is the only candidate in the literature. Unfortunately Caesar has suboptimal characteristics: ACKs to older transactions may be delayed by newer ones (in theory indefinitely), and three round-trips may be required. Fortunately this can be remedied. Caesar appears to suffer this fate due to assembling a precise set of dependencies so that only one possible set of dependencies may be committed (and perhaps also because of its recovery protocol). Accord instead assembles an inconsistent set of dependencies. By inconsistent we mean that it may differ between coordinators, and we are OK with different sets of dependencies being committed to different replicas. We only require that all of these sets of dependencies are a superset of those that may be committed with a lower timestamp.
Preliminaries
We use hybrid logical clocks that are globally unique, that is each replica has its own unique id that is appended to each logical clock value.
Fast Path
A coordinator C proposes a timestamp t0 to at least a quorum of a fast path electorate. If t0 is larger than all timestamps witnessed for all prior conflicting transactions, t0 is accepted by a replica. If a fast path quorum of responses accept, the transaction is agreed to execute at t0. Replicas respond with the set of transactions they have witnessed that may execute with a lower timestamp, i.e. those with a lower t0.
Fast Path Coordinator C: Send PreAccept(X, t0) to replicas of all shards Replica R: if (have witnessed a newer conflicting timestamp) then t = some new higher timestamp issued by R else t = t0 PreAccepted[X] = true Reply (t, deps = {conflicting transactions where t0 < t}) Coordinator C (with at least a simple quorum from each shard): If (a fast-path quorum of responses from each shard had t = t0) then send Commit(X, t0, t0, union of all deps) go to Execution ...
Slow Path
If a replica refuses to accept t0 it responds with a higher t than any other it has witnessed. If C fails to reach fast path consensus it takes the highest t it witnessed from its responses, which constitutes a simple Lamport clock value imposing a valid total order. This value is proposed to at least a simple majority of nodes, along with the union of the dependenciesreceived in the preaccept phase. This round’s purpose is only to record durably which Lamport clock value that might be derived was selected (as multiple valid Lamport clock values might be obtained depending on which responses were received by C), so that if C crashes a recovery coordinator will pick the same timestamp. The inclusion of dependencies in the proposal is solely to facilitate Recovery of other transactions that may be incomplete - these are stored on each replica to facilitate decisions at recovery. Replicas as a result always accept the proposal (unless a newer ballot has been issued by a recovery coordinator to take over the transaction), and once a majority have accepted the proposal it is durably decided. Replicas respond with a new set of dependencies containing those transactions they have witnessed with a lower t0 than the t they received. The coordinator discards the dependencies it received previously on the fast path and uses these new dependencies for execution.
Slow Path Coordinator C: ... else t = maximum t from responses send Accept(X, t0, t, deps) to replicas of all shards Replica R receiving Accept(X, t0, t, deps): Accepted[X] = true Reply (deps = {conflicting transactions where t0 < t}) Coordinator C (with a simple quorum from each shard): send Commit(X, t0, t0, union of all deps) go to Execution
Execution
The union of all dependencies received during consensus is derived before t is disseminated via Commit and simultaneously a Read is issued by C to a member of each participating shard (preferably in the same DC), with those dependencies known to participate in that shard attached. This replica waits for all dependencies to be committed before filtering out those that are assigned a later t. The remaining dependencies are waited on until they execute and their result applied on this replica, before the read is evaluated and returned to the coordinator. C combines these responses to compute an update and client response, which is then disseminated by Apply to all replicas and returned to the client (respectively).
Execution Replica R receiving Commit(X, deps): Committed[X] = true Coordinator C: send a read to one or more (preferably local) replicas of each shard (containing those deps that apply on the shard) Replica R receiving Read(X, t, deps): Wait for deps to be committed Wait for deps with a lower t to be applied locally Reply with result of read Coordinator C (with a response from each shard): result = execute(read responses) send Apply(result) to all replicas of each shard send result to client Replica R receiving Apply(X, t, deps, result): Wait for deps to be committed Wait for deps with a lower t to be applied locally Apply result locally Applied[X] = true
Recovery
Recovery of a transaction X is not terribly complicated. It assumes that it is executed by a replica R that has witnessed X. Replicas are contacted for their knowledge of the transaction. If they do not already know of X, they initially perform the same steps as they would in the fast path to record its arrival time. Then the replica consults its local state for any transactions that it has recorded that would have witnessed X if X had reached fast path consensus. If any are present that did not witness X this fact is returned to the coordinator. Any transactions with a lower t0 that have been proposed a higher t are also returned, alongside the replica’s other state for X.
R assembles a simple quorum, and picks the highest step to complete. That is to say, if it sees an Apply, Execute, Commit or Slow Path round it picks them up in that order of precedence. Otherwise it must decide if it should pick t0 or some t it found from the majority it contacted to propose in a new Slow Path round. The simple intuition here is that either it is safe to propose t0 or we know that we did not reach fast path consensus - this is determined by the replica responses indicating if they have witnessed a newer transaction that did not witness X. In this case we know X did not reach a fast path quorum else it would have been witnessed. If we witness no such transaction, then it is safe to propose t0 as every newer transaction knows of X and is waiting for it to commit. The only edge case here is a transaction with a lower t0 that is proposing a newer t but has not yet committed. If we get to this point, we must wait for these transactions to commit before retrying.
Recovery Coordinator C: send Recover(X) to replicas of each shard Replica R receiving Recover(X): Ensure X is PreAccepted; if only PreAccepted compute deps Wait = {Accepted transactions with lower t0 but higher t than X} Superceding = {Accepted transactions with higher t0 that did not witness X} ∪ {Committed transactions with higher t than t0 of X, that did not witness X} Reply (local state for X, Wait, Superceding) Coordinator C (with a quorum of responses from each shard): If (any R in responses had X as Applied, Committed, or Accepted) then continue the state machine from there Otherwise If (any shard has insufficient R where t = t0 to have taken the fast path) then propose the highest t of any response on the Slow Path If (any R.Superceding is non-empty) then propose the highest t in any response on the Slow Path If (any R.Wait is non-empty) then wait for them to commit and retry Otherwise propose t0 on the Slow Path
Test Plan
This work will be tested extensively using a combination of isolated randomised testing using verification of the standalone library using Jepsen.io tooling, dedicated linearizability and serializability verification of the standalone library, and similar facilities in situ in Cassandra using the Simulator facility coming with CEP-10.