ID | IEP-91 |
Author | |
Sponsor | |
Created |
|
Status | ACTIVE |
If I have seen further, it is by standing on ye shoulders of Giants
Isaac Newton
One of the significant features of Apache Ignite 3, as a distributed database, is the ability to execute multiple table operations as a single atomic operation, known as a transaction. We need to design a modern and robust distributed transaction protocol, considering current best practices. Compared to the previous release, we aim to support transactional SQL from the beginning and remove limitations like the limited size of a transaction.
In this section, I'll give some definitions encountered through the text. It can be used to find a definition of a specific term quickly.
Record (aka Row, Tuple) - a collection of attribute-value pairs.
Transaction - a sequence of logically related partially ordered actions (reads or writes) over the database objects.
Atomicity - a transaction property that declares that all actions are carried out, or none are.
Consistency - a property that moves a database from one consistent state to another after the finish. The meaning of the consistent state is defined by a user.
Isolation - a measure of mutual influence between interleaved transactions.
Durability - a transaction property that guarantees that the database state remains unchanged after a transaction is committed, despite any failures.
Schedule - a way of executing interleaved transactions.
Serial schedule - a schedule where all transactions are executed sequentially.
Serializable schedule - a schedule equivalent to some serial execution of interleaved transactions.
Concurrency control (CC) - a technique to preserve database consistency in case of interleaved committed transactions.
Multi-version concurrency control (MVCC) - a family of concurrency control techniques based on writing multiple record versions (copy-on-write).
Recoverable schedule - a schedule that is not affected by aborting some of the involved transactions. A transaction reads only committed values to achieve this.
Interactive transaction - a transaction whose operation set is not known a priori. It can be aborted at any time if not committed yet.
Cascading abort - a situation in which the abort of one transaction causes the abort of another dependent transaction to avoid inconsistency.
HTAP - hybrid transactional/analytical processing
SG - serializability graph
RO - the abbreviation for "read-only"
RW - the abbreviation for "read-write"
TO - the abbreviation for "timestamp ordering"
To define key points of the protocol design, let's take a look at some features, which can be provided by the product, and value them from 1 to 3, where 3 means maximum importance for product success.
Let's evaluate each feature:
Here we take into account the isolation property of a transaction. The strongest isolation is known to be Serializable, implying all transactions pretend to execute sequentially. This is convenient for users because it prevents hidden data corruption and security issues. This price may be reduced throughput/latency due to increased overhead from CC protocol. An additional option is to allow a user to choose a weaker isolation level, like SNAPSHOT. The ultimate goal is to implement Serializability without sacrificing performance too much, having Serializable as the default isolation level.
Score: 3
This is the natural way to use transactions.
Score: 3
Such transactions can be used to build analytical reports, which can take minutes without affecting (and being affected by) concurrent OLTP load. Any SQL select for read query is naturally mapped to this type of transaction. Such transactions can also read snapshot data in the past at some timestamp. This is also known as HTAP.
Score: 3
A handy feature for load balancing, especially in conjunction with the previous.
Score: 3
Some databases limit the number and total size of records enlisted in a transaction, because they buffer temporary uncommitted read or written records.. This is not convenient for a user.
Score: 2
Nice to have, can help with migration scenarios.
Score: 1
It's essential how many node failures we can tolerate until declaring the unavailability due to temporary data loss (or full unavailability, in case of in-memory deployment). This is known as k-safety. More is better.
Score: 2
Looking at the evaluation, it's easy to notice that our protocol design is of “one size fits all” types. It is an intentional choice because Apache Ignite is intended to be a general use case database designed for commodity hardware and work “fine” out of the box in typical cases. Of course, there are cases where specialized solutions would work better. Additional optimizations and tuning capabilities can be introduced later.
Let’s define the key points of a design. It’s necessary to have:
The first requirement disables deterministic protocols like Calvin because they need to know the transaction read-write set in advance (or require the expensive reconnaissance step).
The second requirement implies a CC protocol, which allows non-conflicting execution of read and write transactions.
The third requirement implies a CC protocol that allows for serialized schedules.
The fourth requirement implies not buffering a whole transaction in memory or on disk.
The system also has to be horizontally scalable. To achieve scalability, the data will be partitioned using a hash or range partitioning method. The exact partitioning method is not essential for the purpose of this document. We treat a partition here as a synonym for a shard. Each partition is assigned to a cluster node and replicated to a predefined number of replicas to achieve high availability. Adding more nodes increases the number of partitions in the cluster (or reduces the number of partitions per node in case of static partitioning), thus increasing the scalability.
A transaction can span multiple partitions, making it distributed. Each partition is additionally replicated in multiple copies for durability. Providing atomicity on a commit is an additional difficulty in a distributed environment. Typically this is achieved by using two-phase commit protocol or its improved consensus based version.
The performance aspect is not a central goal in the proposed design. We need a "good" level of performance at the beginning and a great level of usability. We can optimize later, following the step-by-step improvement of the product.
It turns out we want a Google Spanner clone. It seems it was designed keeping similar goals in mind. Other notable Spanner clones are Cockroachdb and Yugabyte.
Transaction protocol describes the interaction of nodes forming a transactional table store - an (indexed) distributed records store supporting operations with ACID guarantees.
It incorporates:
A typical transaction lifecycle looks like this:
The API entry point is the IgniteTransactions facade:
public interface IgniteTransactions { Transaction begin(); CompletableFuture<Transaction> beginAsync(); /** * Executes a closure within a transaction. * * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. * <p> In case of serialization conflict (or other retriable issue), the transaction will be automatically retried, so the closure must be a "pure function". * @param clo The closure. * * @throws TransactionException If a transaction can't be finished successfully. */ void runInTransaction(Consumer<Transaction> clo); <T> T runInTransaction(Function<Transaction, T> clo); <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, CompletableFuture<T>> clo) }
public interface Transaction { /** * Synchronously commits a transaction. * * @throws TransactionException If a transaction can't be committed. */ void commit() throws TransactionException; /** * Asynchronously commits a transaction. * * @return The future. */ CompletableFuture<Void> commitAsync(); /** * Synchronously rolls back a transaction. * * @throws TransactionException If a transaction can't be rolled back. */ void rollback() throws TransactionException; /** * Asynchronously rolls back a transaction. * * @return The future. */ CompletableFuture<Void> rollbackAsync(); }
To enlist the operation into a transaction, the Transaction instance must be passed to the corresponding transaction’s store methods. Each method accepts a transaction argument. Consider, for example, a method for updating a tuple:
public interface TupleStore { /** * Inserts a record into the table if it doesn't exist or replaces the existing one. * * @param txn The transaction or {@code null} to auto commit. * @param tuple A record to insert into the table. The record cannot be {@code null}. */ void upsert(@Nullable Transaction txn, Tuple tuple); CompletableFuture<Void> upsertAsync(@Nullable Transaction txn, Tuple tuple); /** Other methods here */ }
Multiple operations accepting the same transaction instance are executed atomically within a passed transaction.
In contrast to Apache Ignite 2, a transaction is not bound to any particular thread of control by design. It can be executed by any number of threads. For example, one thread can execute any number of transactions.
Asynchronous API allows non-blocking transaction processing and provides the best performance.
Defines how the table data (including indexes) is split between data nodes. A correct partitioning scheme is essential for scalable cluster workloads. TBD partitioning IEP ref, add diagrams
A partition (or affinity) key is a set of attributes used to compute a partition. For example, a prefix set of a primary key can be used to calculate a partition. A table partition key matches its primary index partition key. A secondary index is not required to have a partition key - such indexes are implicitly partitioned (as table rows).
Data is assigned to nodes using some kind of a hash function, calculated over a set of attributes. There are several approaches to hash partitioning with different trade-offs (consistent vs. rendezvous).
Key space is split into range partitions (ranges) according to predefined rules (statically) or dynamically. Each range is mapped to a data node and represents a sorted map. The range search tree is used to locate a key for a node. Order by for such a table is just traversing a search tree from left to right and concatenating each range.
Some data is considered co-located if they have the same partition key. For example, two tables are co-located if they have the same partition key for primary indexes.
Note that table and index can reside on the same data node, but they can not be co-located.
If the index has no explicit partition key, its data is partitioned implicitly, the same as the PK partition key.
Additional information can be found here: IEP-86: Colocation Key
The data can be additionally locally partitioned (within a partition) to improve query performance and parallelism.
We will rely on a replication protocol to store data on multiple replicas consistently. Essentially, a replication protocol provides transactional semantics for a single partition update. We will build a cross partition transactional protocol on top of a replication protocol, as suggested in IEP-61.
The actual protocol type is not so important because we don’t want to depend on specific protocol features - this breaks an abstraction.
There are two types of replication protocols:
IEP-61 suggests using a majority based replication. When using a majority based replication, we tolerate f failed nodes from n total nodes (per replication group), where n >= 2f + 1.
Each partition forms an independent replication group, increasing availability.
CC is responsible for controlling the schedules of RO and RW transactions to be serializable. It defines the types of schedules allowed by concurrent transactions and is a key point of a protocol.
We need a store for storing rows and metadata in a data partition.
Indexing support is essential for a relational database. We will aim to support essential index types, like hash and sorted. Additional index types can be added later.
We need to access the information about tables and indexes during operations execution. The primary source for such information is the metastore. Each data node has an eventually consistent copy of a metastore. Metastore can be implemented as a system table, stretched to the whole cluster nodes, so the data is available locally.
Before continuing further towards the discussion of CC protocol, let's dive a bit into the basics of serializability theory.
Example 1. Assume we have three transactions:
T1 = r1[x] r1[z] w1[x], T2 = r2[y] r2[z] w2[y], T3 = w3[x] r3[y] w3[z]
and two schedules:
S1 = w3[x] r3[y] w3[z] r2[y] r2[z] w2[y] r1[x] r1[z] w1[x]
S2 = w3[x] r1[x] r3[y] r2[y] w3[z] r2[z] r1[z] w2[y] w1[x]
We assume each transaction is committed. What can we tell about the serializability of S1 and S2 ? Recall the serializable schedule definition: to be serializable; it must be equivalent to some serial execution order of transactions T1, T2, and T3.
Two actions on the same data object, modified by different transactions, conflict if at least one of them is a write. The three anomalous situations can be described in terms of when the actions of two transactions T1 and T2 ,conflict with each other: in a write-read (WR) conflict T2 reads a data object previously written by T1; we define read-write (RW) and write-write (WW) conflicts similarly. These conflicts can cause anomalies like dirty reads, unrepeatable reads, lost updates, and others.
The S1 is obviously serial: it corresponds to the execution sequence: T3, T2, T1. It's not that obvious for S2 if it's serializable or not. To prove it, we should find an equivalent serializable schedule. We can attempt to swap a non-conflicting operation (preserving the order of conflicting) until the equivalent schedule is produced.
w3[x] r1[x] r3[y] r2[y] w3[z] r2[z] r1[z] w2[y] w1[x] → w3[x] r3[y] r1[x] r2[y] w3[z] r2[z] r1[z] w2[y] w1[x] → w3[x] r3[y] r2[y] r1[x] w3[z] r2[z] r1[z] w2[y] w1[x] → w3[x] r3[y] r2[y] w3[z] r1[x] r2[z] r1[z] w2[y] w1[x] → w3[x] r3[y] w3[z] r2[y] r1[x] r2[z] r1[z] w2[y] w1[x] → w3[x] r3[y] w3[z] r2[y] r2[z] r1[x] r1[z] w2[y] w1[x] → w3[x] r3[y] w3[z] r2[y] r2[z] w2[y] r1[x] r1[z] w1[x]
So, S2 is serializable and equivalent to T3, T2, T1.
Schedules are called conflict equivalent if they can be converted one into another by swapping non-conflicting operations, which do not affect execution outcome. This also means they have the same order of conflicting operations. A schedule is called conflict serializable if it’s conflict equivalent to a serial schedule. Every conflict serializable schedule is serializable, but not vice versa. This class of schedules is defined as CSR.
It is useful to capture all potential conflicts between the transactions in a schedule in a precedence graph, also called a serializability graph (SG).
The precedence graph for a schedule S contains:
A schedule S is a conflict serializable if and only if its precedence graph is acyclic. An equivalent serial schedule, in this case is given by any topological sort over the precedence graph.
Example 2. Consider the transactions:
T1 = r1[x] w1[x] c1, T2 = w2[x] c2
and a schedule: S = r1[x] w2[x] c2 w1[x] c1
S is not conflict serializable, because it's not equivalent to either T1, T2 or T2, T1.
Example 3. Consider the incrementing transactions:
T1=r1[x] w1[x], T2=r2[x] w2[x]
and a schedule: S = r2[x] r1[x] w2[x] w1[x]
S is not conflict serializable, because it's not equivalent to either T1, T2 or T2, T1
Conflict serializability is sufficient but not necessary for serializability. A more general condition is view serializability.
Intuitively, two schedules are equivalent if the executions that produced them leave the database in the same state. The formal definition of equivalence is:
Two schedules are view equivalent if and only if:
We say a read operation j reads from a write operation i if read operation j reads a value most recently written by write operation i. The final write of a schedule is the schedule's last write operation.
A schedule is view-serializable if it is view equivalent to some serial schedule. Every conflict serializable schedule is view-serializable, although the converse is not true. This class of schedules is defined as VSR.
It can be shown that any view serializable schedule that is not conflict serializable contains a blind write. Blind write is simply when a transaction writes without reading.
Enforcing or testing view serializability turns out to be much more expensive, and the concept therefore has little practical use.
TBD example of view serializable schedule but non CSR
Until now we have talked about schedules having only committed transactions. We must also take into consideration schedules containing aborted transactions (this can happen also due to internal error or a crash), which brings the recoverability notion for a schedule.
Example 4. Consider two transactions
T1 = w1[x] a1, T2 = r2[x] w2[x] c2
and S = w1[x] r2[x] w2[x] c2 a1
Such the schedule is unrecoverable, because if the T2 is committed it can't be undone on T1 abort.
Example 5. Consider two transactions:
T1 = w1[x] c1, T2 = w2[x] a2
and S = w2[x] w1[x] a2 c1
Here w1[x] will be undone by a2 (which reverts w2[x]), causing a lost update for T1.
If in a schedule,
then such a schedule is called a recoverable schedule. Recoverability is an essential feature and a must have.
If a schedule only reads data written by already committed transactions, it is called cascadeless schedule. Uncommitted writes are possible. It avoids cascading aborts.
A strict schedule only reads or writes conflicting data written by already committed transactions. Strict schedules are recoverable, do not require cascading aborts, and actions of aborted transactions can be undone by restoring the original values of modified objects. This greatly simplifies the recovery procedure.
Example 6:
T1 = r1[x] w1[x] c1, T2 = r2[x] w2[x] c2
S = r1[x] r2[x] w1[x] c1 w2[x] c2
So, S is the strict schedule which is not serializable.
This theoretical background will come in handy then reasoning about CC protocol.
The reader can get the additional insight on a serializability theory by reading these books: Transactional Information Systems, Transaction Processing, Database Management Systems
CC is categorized as three main types:
CCs are often combined with a multi-versioning to improve characteristics.
This is the most widely used protocol. There are two rules:
The 2PL lock matrix looks like this:
Lock requested by T2 | Lock held by T1 | |
S | X | |
S | + | - |
X | - | - |
This protocol produces recoverable conflict serializable schedules, but allows cascading aborts, which complicates a recovery procedure.
2PL is deadlock-prone. There are different approaches to deadlock handling:
The concurrency may not be the best for 2PL, and a number of held locks can be quite high. This can be mitigated by using MGL locking together with lock escalation.
A starvation is also an issue for a lock based CC, if badly implemented:
A subject for a lock thrashing - at a certain point of contention stops making progress.
Another problem - slow or hanging transactions can block others' progress (not an issue with non-blocking OCC) - can be mitigated with timeouts.
This is a single version protocol - no data is copied somewhere during a transaction.
Write lock time can be reduced by buffering writes locally until the commit.
Doesn’t provide a commit ordering property. Provides recoverability and cascadeless.
This protocol is strict and provides a commit ordering property.
Proposed by Bernstein and Goodman. Each transaction can be assigned a timestamp at startup, and the protocol ensures, at execution time, that if action Ai of transaction Ti conflicts with action Aj of transaction Tj, Ai occurs before Aj if TS(Ti) < TS(Tj).
If an action violates this ordering, the transaction is aborted and restarted.
To implement this CC scheme, every database object O maintains a last read timestamp RTS(O) and a last write timestamp WTS(O).
If transaction T wants to read object O:
If transaction T wants to write object O:
Unfortunately, the timestamp protocol presented above permits schedules that are not recoverable, as follow from the example:
T1 = w1[x] a1, T2 = r2[x] w2[x] c2, TS(T1) = 1, TS(T2) = 2
and S = w1[x] r2[x] w2[x] c2 a1 - which is unrecoverable.
To make T/O recoverable, it’s necessary to prevent reading uncommitted values for a transaction (for example, by buffering writes until a commit, or aborting a reading txn, or a locking).
Similar issue with repeatable reads: r1[x] w2[x] r1[x]
A transaction is assigned a monotonic ts on start and all transactions are ordered according to the timestamp. This kind of CC requires knowing of read/write set a priori (or doing reconnaissance step).
However, it is a good fit for “atomic” cache, if only we decide to have one.
This type of CC prevents transactions which create cycles in the SGT graph.
Maintaining SGT graph and testing for cycles may be expensive, but the recent work shows that this is a viable solution on a modern hardware.
The protocol tracks the read/write sets of each transaction and stores all of their write operations in their private workspace (Read phase). When a transaction commits, the system determines whether that transaction’s read set overlaps with the write set of any concurrent transactions(Validation phase). If no overlap exists, then the protocol applies the changes from the transaction’s workspace into the database (Write phase); otherwise, the transaction is aborted and restarted.
Where are two types of validation:
Execute a transaction's validation and write phase together as a critical section: while ti being in the val-write phase, no other tk can enter its val-write phase
BOCC validation of tj:
compare tj to all previously committed ti
accept tj if one of the following holds
In other words, we test if previously committed transactions made the current transaction non serializable by invalidating its reads. The transaction serialization order matches the validation order.
Execute a transaction's val-write phase as a strong critical section: while ti being in the val-write phase, no other tk can perform any steps
FOCC validation of tj:
compare tj to all concurrently active ti (which must be in their read phase)
accept tj if WS(tj) ∩ RS*(ti) =∅ where RS*(ti) is the current read set of ti
FOCC is much more flexible than BOCC - upon unsuccessful validation of tj it has three options:
In other words, we test if currently active transaction reads will be invalidated by a current writing transactino, if it commits. The transaction serialization order matches the validation order.
Optimistic CC first described in Kung-Robinson paper.
Uses different CC for handling r-w-r and w-w conflicts.
Mentioned in Bernstein and Goodman paper.
This family of protocols tracks for “dangerous” anti-dependencies (r-w conflicts) in SG on top of a weaker isolation level and aborts a transactions which can possibly create a cycle.
In this section I will evaluate some of the existing CC protocols. Most of them are intended for multi-core systems, but some are developed specifically for distributed shared-nothing systems.
This protocol protects a version chain head from concurrent access with a 2PL protocol. A beginTs is assigned at the start of a tx for deadlock prevention purposes. A new version is created under a X lock by a writing transaction. A commit timestamp is assigned on a pre-commit phase (at a lock point) and a version chain head is rewritten to reflect it, before releasing a lock. This commitTs is used solely for snapshot reads.
The idea is to use a version chain instead of maintaining a private transaction workspace.
A transaction starts in a read phase. Upon reading the latest version, the DBMS adds this record to the transaction’s read set. A transaction cannot read a new version until the other transaction that created it commits. A transaction that reads an outdated version will only find out that it should abort in the validation phase, if a writing tx was committed earlier.
T is allowed to update version V if its write lock is not acquired. In a multi-version setting, if the transaction updates version V, then the DBMS creates version V+1 with its txnId set to ID.
When a transaction instructs the DBMS that it wants to commit, it then enters the validation phase. First, the DBMS assigns the transaction another timestamp (commitTs) to determine the serialization order of transactions. The DBMS then determines whether the tuples in the transaction’s read set were updated by a transaction that was already committed. If the transaction passes these checks, it then enters the write phase where the DBMS installs all the new versions and sets their beginTs to commitTs and endTs to INF
In MVOCC reads and writes are happening all at the commitTs. Reads do not update any fields in version chain entries. Write-write conflicts can be prevented by locks.
Proposed by Bernstein and Goodman.
A transaction is assigned a monotonic ID at the beginning. The idea is to order all transactions according to this pre-assigned ID. We need additional field readTs for tracking reads.T is allowed to read version if its write lock is not held by another active transaction (i.e., value of txId is zero or equal to ID ) because MVTO never allows a transaction to read uncommitted versions (T is aborted otherwise). Upon reading the version (by searching for corresponding beginTs, endTs range), the DBMS sets its read-ts field to ID, if its current value is less than ID. Otherwise, the transaction reads an older version without updating this field.
txId serves as a writeTs. A transaction always updates the latest version of a tuple. Transaction T creates a new version if
If these conditions are satisfied, then the DBMS creates a new version and sets its txId to ID. When T commits, the DBMS sets beginTs and endTts fields to ID and INF (respectively), and last committed endTs field to ID.
A subject to starvation.
Was proposed as a critique for ANSI SQL isolation levels. In snapshot isolation, each transaction is given a snapshot, from which it reads at the beginTs and writes at the commitTs. Write-write conflicts are prevented using a first committer wins rule. Doesn’t provide serializability, but protects from all ANSI SQL anomalies. Moreover, reads are never blocked by writes and vice-versa. There is a variant with a first updated wins rule, which detects conflicts earlier (implemented by Oracle, for example).
This is a validation based CC. It aborts a reading transaction if a previous write transaction has invalidated its ReadSet. The difference to MVOCC is that reads and writes are happening at different timestamps.
Supports read-only transactions out of the box. Doesn’t require pre-declaration of RO transactions - RO txns does not need to be validated, because they are happening at a snapshot. For RO transactions, the read set is passed as empty in the algorithm below. Large read-mostly transactions can starve - this can be mitigated by careful application design.
# Commit request (Ts(Ti), WS, RS) : {commit, abort} 1: for each row r ∈ RS do 2: if lastCommit(r) > Ts(Ti) then 3: return abort; 4: end if 5: end for # Commit Ti 6: Tc(Ti) ← TimestampOracle.next(); 7: for each row r ∈ WS do 8: lastCommit(r) ← Tc(Ti); 9: end for 10:return commit;
Let's take a brief look at SSN (serial safety net), which is a more recent and efficient implementation of a certifier than SSI. It is deployed on top of any other CC protocol (SSI works only over SI isolation), which forbids WW and WR conflicts. Its goal is to prevent a cycle from appearing in a serialization graph G by a committed transaction using some validation rules, similar to OCC.
Assume an dependency edge in G: U ← T.
In this relation U is a predecessor, and T is a successor.
A central concept is the relationship between the partial order of transactions that G defines, and the total order defined by their commit times. At the moment transaction T enters pre-commit, we take a monotonically increasing timestamp, and call it c(T). An edge in G is a forward edge when a predecessor is committed first in time, and a back edge when a successor is committed first. A forward edge can be any type of dependency, but (for the types of CC algorithms we deal with, which enforce write isolation) back edges are always read anti-dependencies where the overwrite is committed before the read.
In addition to the commit timestamp c(T) of transaction T, SSN associates T with two other timestamps: p(T) and n(T), which are respectively the low and high watermarks used to detect conditions that might indicate a cycle in the dependency graph G if T is committed. We define p(T) as the commit time of T’s oldest successor reached through a path of back edges, n(T) as the commit time of the most recently committed direct predecessor of T.
If p(T) <= n(T), the transaction T must be aborted, otherwise a cycle may appear on commit of T. This is called exclusion window violation.
This inequality actually means the direct predecessor can’t be among successors of T, thus avoiding the cycle. The same is true for any predecessor of a direct predecessor.
Committing T5 in the pictire above will close the dependency cycle.
SSN can give false positives, because no full SG information is kept, so some valid schedules are rejected.
Has safe retry property.
Delta storage multi-version CC, capable for HTAP workloads. Details.
OCC type, single version CC with dynamic timestamp allocation, designed specifically for distributed systems. Uses private space for buffering writes. Details.
MVOCC type, prevents any waiting during the normal transaction phase. Details.
MVOCC type, avoids all shared-memory writes for records that were only read. Uses generations for timestamping versions. Details.
This protocol calculates the timestamp of each transaction lazily at the transaction’s commit time by inspecting the tuples it accessed. This is a main difference with other OCC schemes, where commitTs is assigned in a centralized manner.
To encode the serialization information in the tuples, each data version in TicToc has a valid range of timestamps bounded by the write timestamp (wts) and the read timestamp (rts). A read always returns the version valid at that timestamp and a write is ordered after all the reads to older versions of the same tuple:
Foreach v in ReadSet v.wts <= commitTs <= v.rts
Foreach v in WriteSet v.rts < commitTs
# Read phase: each read and write operation copies the latest committed tuple to the private transaction’s store, together with wts and rts. # Data: read set RS, tuple t 1 r = RS.get_new_entry() 2 r.tuple = t # Atomically load wts, rts, and value 3 r.value = t.value, r.wts = t.wts, r.rts = t.rts
# In a validation phase, a transaction is tested to match the aforementioned invariants. # Step 1 – Lock Write Set 1 for w in sorted(WS) do # sort WS to avoid deadlocks 2 lock(w.tuple) 3 end # Step 2 – Compute the Commit Timestamp, it defines serialization order 4 commit_ts = 0 5 for e in WS ∪ RS do 6 if e in WS then 7 commit_ts = max(commit_ts, e.tuple.rts +1) 8 else 9 commit_ts = max(commit_ts, e.wts) 10 end 11 end # Step 3 – Validate the Read Set (writeset is protected by locks from concurrent changes) 12 for r in RS do 13 if r.rts < commit_ts then # some other transaction may possibly invalidate the read tuple # Begin atomic section on r.tuple - validate r-w conflict 14 if r.wts != r.tuple.wts or (r.tuple.rts ≤ commit_ts and isLocked(r.tuple) and r.tuple not in W) then 15 abort() 16 else 17 r.tuple.rts = max(commit_ts, r.tuple.rts) # entry is valid during read and we can advance rts to commit ts 18 end # End atomic section 19 end # other transactions can read tuple in parallel using the same (or higher) commitTs 20 end
1 for w in WS do 2 write(w.tuple.value, w.value) 3 w.tuple.wts = w.tuple.rts = commit_ts 4 unlock(w.tuple) 5 end
This is a deterministic TO type, where data is split into shared-nothing partitions, and each thread manages a single partition to reduce CC overhead to almost zero. A transaction is assigned a monotonic ts on start and all transactions are ordered according to the timestamp. A single partition transaction is executed without any concurrency checks. Such a scheme tends to degrade if a load is not “partitionable” enough, or one partition is overloaded. Details.
Pessimistic multi-version CC, performs well both in high and low contention scenarios. Details.
Deterministic CC. Details.
CRDB uses MVTO with optimistic timestamp advancement in case of order violation to reduce the restart cost. Details.
There is an attempt to integrate MaaT into the CRDB
This is not the exhaustive list of known CC protocols. It would be good to continue a study and discover more interesting proposals, as the research is continuing to go on.
To make a CC distributed we should take into account three additional things:
Unfortunately, not any CC protocol can be effectively made partitioned. For example, some OCC protocols require a monotonic timestamp at the begin/commit of a transaction to define serialization order (or some global critical section during validation which serves the same purpose). This gets complicated in a distributed system. A so called timestamp oracle is required to manage a timestamp in a monotonic order, but it has several innate disadvantages, like limited scalability, availability issues, STW recovery TBD references. We aim to avoid centralized timestamp allocation in our implementation.
Note that It’s not possible to assign commitTs per partition, because validation requests can reorder and each partition will receive its own serialization order, breaking the serializability.
Consider the basic OCC example (with delayed writes) without a proper synchronization:
T1=r1[x] w1[y] c1
T2=r2[y] w2[x] c2
If each partition is committed independently, the following histories can be accepted:
H(x): r1[x] w2[x] c1 c2 => T1 commits first with earlier timestamp
H(y): r2[y] w1[y] c2 c1 => T2 commits first with an earlier timestamp, r2 doesn’t see w1 because it operates on a snapshot.
Another example - let’s check if we can use Write SI in a decentralized environment. We assume each transaction is controlled by dedicated coordinator nodes, assigning timestamps independently.
T1 = r1[x] r1[x], beginTs=2
T2 = w2[x] beginTs=1 commitTs=2
H = r1[x] w2[x] c2 r1[x] - w2 invalidates T1 snapshot. This is not possible if beginTs and commitTs are monotonic.
So, we have to exclude all protocols based on centralized ordering, because it has inherent limitations. The pre-commit validation can also be performed on a dedicated server, but this brings the same availability and scalability issues.
Additionally, we will exclude all SGT based protocols, because maintaining a distributed SGT seems costly. Recent research shows that SGT based schedules can perform well, but not shows a complete solution for a distributed system. This is a subject for additional research.
So, the only viable candidates are:
We consider multi-version variants of these protocols to minimize influence between read-only and read-write transactions. Read-only transactions will utilize previous version of entries.
Atomic commit feature can be added to any CC using a 2PC protocol. 2PC has known availability and latency issues TBD refs, so we try to avoid them by doing a commit using State Machine Replication. We assign a dedicated replication group to handle a transaction finish state.
Data replication is easily integrated into any CC by delaying a commit until all data is successfully replicated.
This is the most interesting part. There are some great papers evaluating CC protocols, both local and distributed. We will consider the results presented in these papers while making a decision.
Some observations are:
Based on that, we define two types of transactions: RW and RO, where read-only is a special type of a transaction, pre-declared not to have any writes. RO transactions will attempt to use multiple versions of data to minimize interference with writes.
Let's investigate the possibilities for RW CC. RW transactions read up-to-date records and bring serializability issues.
As was highlighted in a previous section, only viable candidates are MV2PL, MVTO and MVOCC with decentralized timestamp allocation, or a hybrid scheme, combining other schemes.
Let’s define pros and cons for each protocol in relation to RW transactions.
Pros:
Cons
Pros:
Cons:
Pros:
Cons:
All protocols are viable, depending on workload and environment. MV2PL looks most general of all at the cost of reduced throughput under high contention. It fits well to all design requirements and adds little overhead under normal circumstances.
Locking is also used as part of other CC protocols, MaaT for example, and is useful to prevent phantoms using gaps locking (see below) - SSN paper suggests using this approach. This means we need locking to implement other CCs.
MV store should be almost the same for all aforementioned protocol families and will be mostly shared by implementations. So, the reasonable choice seems to start from MV2PL implementation and evaluate other CC protocols later, covering more specific workloads like special hardware or very high contention. For example, Yugabyte supports opitmistic scheme with a pessimistic being in mind. We should design the core in the way that makes possible replacing the protocol.
RO transactions will be implemented by executing over a immutable snapshot. The reason for this is to affect RW transactions (and vice versa) in a minimal possible way. RO transactions can also be executed in the past (time travel queries), if we have enough history for this.
For valid snapshots we must assign timestamps to versions in monotonic order, so newer versions get a higher timestamp. It is also desirable to have timestamps close to the physical time of a commit for aforementioned “time travel” queries.
The serial order of transactions produced by CC protocol naturally defines the order of all versions written by transactions, which have conflicts.
A read-only transaction at a read timestamp T must prevent any other RW transactions to commit before this timestamp, or choose a read timestamp so all other transactions can’t commit earlier to enforce stable snapshot. This guaranties the snapshot immutability.
MV2PL naturally allows to get RO transactions by using multi-version snapshots and bypassing locks for RO transactions.
Phantom anomaly looks like this
H = r1[P] w2[x ⊂ P] c2 r1[P] .., P is a predicate.
It’s almost the same as a non-repeatable read anomaly. Multi-versioning doesn’t itself help against phantoms. It must be combined with a CC protocol to prevent such schedules.
Some known phantom protection techniques:
MV2PL fits well with the next-key locking scheme.
This section described the data structures used for storing table data. Each partition will have its own set. These data structures are more or less the same independently on a CC protocol.
The basic data structure for MVCC is a version chain. It maintains a list of versions for a record. Various representations of a version chain are possible, as described in the paper An Empirical Evaluation of
In-Memory Multi-Version Concurrency Control. It states that the N2O scheme performs well, so we will follow this path. A single chain entry may look like this:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | 1000 | INF | NULL | NULL |
Other MVCC-based protocols can add their own fields to the basic chain entry structure.
Here is the example of a version chain (from newest to oldest) with uncommitted (at the head) and committed versions:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | NULL | NULL | 12345 | Ref to next row |
val:200 | 1000 | INF | NULL | NULL |
Here is the example of a version chain (from newest to oldest) with two committed versions:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | 2000 | INF | NULL | Ref to next row |
val:200 | 1000 | 2000 | NULL | NULL |
We call the most up-to-date record the version chain head.
Storing the full set of attributes in the version chain is not space efficient. It may be worth investigating other row storage schemes, like per column updates.
The row store maps a record’s internal unique RowId to its version chain:
RowId -> version chain HEAD
The RowId here is some internal unique identifier of a row. For example, an UUID can be used for this purpose. The rowId is generated then a record is inserted into a RowStore. RowId must provide an efficient way to lookup a row depending on an underlying storage.
A RowId must contain a pre-computed partitionId for unicast lookup of a row at any node, because table rows are assumed to be co-located with a primary key.
Secondary indexes (indexes over record attributes, including the primary key) can speed up certain queries by reducing the amount of scans. Instead, a secondary index is scanned first if it has strong selectivity.
For example, a user may have a unique email address and can be searched very fast by a secondary index on the email field.
A secondary index over a primary key is also known as a primary index. Strictly speaking, it’s not required (because a table row can be fetched by RowId), but is necessary for efficient retrieval by the key.
The paper An Empirical Evaluation of In-Memory Multi-Version Concurrency Control covers various indexing schemes. We will aim to use an indirection scheme, because it’s easy to implement and performs well.
In this scheme records in a SecondaryIndex have the following structure:
secondaryKey -> RowId for unique secondary index
secondaryKey -> Cursor<RowId > for non-unique secondary index
We define two types of indexes - HASH and SORTED. Other index types (like bloom filters) can be added later:
Let’s consider the example. Assume the employee table has (name, deptId) fields and is indexed by department id (deptId -> rowId).
INSERT INTO employee (name, deptId) VALUES (‘test’, 10); // Creates a row with RowId=ABC
This creates the committed version:
addr | value | beginTs | endTs | txId | nextPtr |
X | ( “test”, 10) | 1000 | INF | NULL | NULL |
The secondary index at this point contains one entry:
10 -> ABC
When it was moved from department 10 to department 11:
UPDATE employee set deptId = 11 where RowId=ABC
The version chain for this transition will look like this:
addr | value | beginTs | endTs | txId | nextPtr |
X | ( “test”, 11) | 2000 | INF | NULL | Y |
Y | ( “test”, 10) | 1000 | 2000 | NULL | NULL |
The secondary index at this point contains two entries, making it multiversion:
10 -> ABC // Valid for commitTs=1000
11 -> ABC // Valid for commitTs=2000
Note that we don’t remove a previous value from the indexed set.
Assume we want to do a scan over the secondary index at the timestamp T.
For T=1000 we will get:
10 -> resolve(ABC, 1000) -> (“test”, 10)
11 -> resolve(ABC, 1000) -> (“test”, 10) <= false positive
For T=2000 we will get:
10 -> resolve(ABC, 2000) -> (“test”, 11) <= false positive
11 -> resolve(ABC, 2000) -> (“test”, 11)
To exclude the false positive from the scan, the additional check is required - the index key must be compared with the indexed value during version chain traversal to retain only matched values:
10 -> resolve(ABC, 2000, 10) -> 10 != 11 -> skip
11 -> resolve(ABC, 2000, 11) -> (“test”, 11)
Pure index scans are not possible when using the indirection model.
The main benefit of the scheme - it requires minimum updates to maintain secondary index consistency. For example, if the indexed object changes but some indexed field remains the same, the secondary index for this field will not be modified.
We can alter this scheme to support index only scans by storing parts of version chain info in indexes values:
10, (ABC, 1000-2000)
11, (ABC, 2000-INF)
Assume the scan at T=2000
10 -> resolve(ABC, 1000-2000, 2000) -> skip
11 -> resolve(ABC, 2000-INF, 2000) -> (“test”, 11)
We can avoid resolving the version chain (and getting false positives) by using liveness range check. But the cost is additional storage space and complexity. Another problem is additional write amplification on commit.
Indexes for uncommitted write intents will look like:
12, (ABC, txId)
They need to be rewritten (asynchronously) on commit, then a commitTs becomes known, causing additional I/O:
10, (ABC, 1000-2000)
11, (ABC, 2000-INF) -> 11, (ABC, 2000-3000) // Write amp!
12, (ABC, txId) -> 12, (ABC, 3000-INF) // Write amp!
This scheme doesn’t reduce the amount of index scans, but avoids version chain lookup for index only query (queried field belongs to indexed set). The previous scheme seems more general, but possibly can hurt performance for read intensive applications.
A similar scheme can also be implemented by writing liveness ranges to index keys and reduce the number of index scans at the cost of even more work at the commit. However, this scheme works well with pre-ordered transactions (for example, in MVTO CC), because the commit timestamp is known apriori.
I suggest starting with a first approach as the simplest. This scheme can be investigated later as an optimization, if some degradation will be reported by benchmarks.
The row store and secondary indexes can be accessed concurrently, so it’s important for them to be thread safe.
We assume that all access to the latest values (timestamp=null) is made while holding proper read-write locks (to maintain the invariant - only one uncommitted version exists at a time). The locking details are described in the corresponding section.
Scans at the timestamp must be lock free.
The one drawback of MVCC schemes is unbounded growth of versions. To avoid storage overflow old versions must be efficiently pruned.
Approaches for garbage collection are described in An Empirical Evaluation of In-Memory Multi-Version Concurrency Control
Efficient GC is very important for product success. This part of a design will be provided in a separate document.
The proposed version chain implies the inlining of row attributes. This doesn’t work well when an attribute can’t be efficiently inlined, for example for BLOBs. In such a case, the attribute value can be stored in auxiliary storage and referenced by a pointer from the versioned value. A reference counter must be attached to an attribute value. When it reaches zero, the attribute can be removed from auxiliary storage.
BLOB columns can’t be indexed.
As was mentioned before, we want to avoid centralized timestamp assignment during a commit. But here is a good question - how to assign timestamps and what is a timestamp in a distributed system, actually. There is a great paper describing a protocol for ordering the events in a distributed system using a logical timestamp close to the physical time - the hybrid logical clock(HLC). In a nutshell, this is a Lamport clock bound to a physical time. It seems to be a natural choice for tracking transaction dependencies without creating additional version fields. The actual serialization order is defined by CC protocol. We assume that our txn protocol carries the hybrid clock in each message and updates the local node’s clock according to the rules to track causality. See the paper for details. We will rely on HLC for timestamping row versions in the system.
There is a special role assigned to one of partition replicas, managed by a replication protocol. We assign a primary replica - it executes the CC protocol for RW transactions, so all reads and writes go through it.
Each replica is identified by monotonic liveness interval (startTs, endTs). All such intervals are disjoint, so the new primary replica liveness interval can’t overlap with the previous. This means at most one primary replica for a partition can exist at a time. The most efficient implementation is to colocate it to a leader for consensus based replication protocols.
A primary replica role can be gracefully transferred to another replica. TBD
There is a window between two adjacent liveness intervals (due to disjoint rule). The partition is not available for reads in between these intervals (in addition to unavailability related to a replication).
Each transaction holds a primary replica lease for each enlisted partition. A lease defined by the interval in which a transaction is allowed to perform reads from a corresponding partition. For long transactions the lease is periodically refreshed - this is required for RW conflict avoidance during node fail - see Recovery section. This concept is similar to the primary partition in AI2. All read operations must go through the same primary replica - so the reading transaction can’t survive a primary replica failure. TBD replace lease word ?
Each data node, which can be enlisted into a transaction, stores local txn state in a volatile map:
txId -> (txState, txCoordAddr, commitTs)
This map is updated according to RW protocol. New entry is put then a partition is enlisted into a txn. The state is updated as soon as txn is committed or aborted.
Entries are purged from the map using FIFO policy.
To execute the write intent resolution part of CC protocol we need to add the commit partition id field to the version chain. It matters only for uncommitted versions:
tuple | beginTs | endTs | txId | commitPartId | nextPtr |
val:100 | 1000 | INF | NULL | NULL |
Here is the description of a CC used for RW transactions. Locking is done using the SS2PL scheme (all locks are released after the commit). All requests/responses implicitly contain HLC timestamp and assume HLC propagation, I will omit it for the sake of simplicity.
The OpRequest structure is:
txId:UUID
op:OperationType
keys:Array or Range
values:Array // Only for writes
commitPartId // Only for writes
The OpResponse structure is:
opCode:int // 0 - ok, !0 - error code
result:Array
readLeases:Map<partitionId, LeaseInterval>
The TxFinishRequest structure is:
txId:UUID
commit:bool
txCoordNodeId:NodeId
enlistedPartitions:Array // For recovery purposes
Assume a RW transaction involving two partitions. First operation reads from partition 1, second writes the received value to partition 2:
T=r[x], w[y], c
It uses HLC for timestamping written values, required for read-only transactions.
We’ll start from the example diagram, then provide a detailed description for a general case.
Locks can be acquired by transaction in two modes:
Lock requested by T2 | Lock held by T1 | |
S | X | |
S | + | - |
X | - | - |
The lock compatibility matrix for basic 2PL locks
A partial order, called exclusivity order and denoted by ≤, is defined on the set of lock modes. For lock modes m1 and m2, m1 ≤ m2 means that m2 is at least as exclusive as m1, so that any action permitted by a lock of mode m1 is also permitted by a lock of mode m2. For the set {S;X}, we have S < X, that is, S ≤ X and S != X.
This is because X permits more actions (reading and writing) than S (only reading). In general, we assume that the set M of lock modes used is a lattice so that, for any {m1,m2} ∈ M, there exists a unique lock mode denoted sup{m1,m2} (the supremum of m1 and m2) that is the least exclusive mode in M that is at least as exclusive as both m1 and m2. We have sup{S,X} = X.
A lock upgrade occurs if a transaction T already holds a d-duration m-mode lock on a data item x and later requests a d’-duration m’-lock on x, where either m < m’ or no exclusivity order is defined between m and m’. If no other transaction holds on x a lock that is incompatible with m’, T is granted on x a d’-duration lock of mode sup{m,m’}.
A request to upgrade an m lock on x to an m’ lock yields (when allowed) an m’’ lock on x if the entry for (m’;m) in the lock-upgrade matrix contains the value m’’.
Using only these types of locks produces bad concurrency, so more advanced schemes were invented.
In what follows, we will use the form of locking, called the multi-granularity locking (MGL), which introduces a new type of locks called intention locks. Such locks are of three different modes: IS (intention-shared), IX (intention-exclusive), and SIX (shared + intention-exclusive). MGL locking defines the hierarchy of lock objects (for example, a table and a row in the table), and acquires different locks on different hierarchy levels. This approach helps to reduce a number of deadlock and improve the concurrency.
Mode | IS | IX | S | SIX | X |
IS | Yes | Yes | Yes | Yes | No |
IX | Yes | Yes | No | No | No |
S | Yes | No | Yes | No | No |
SIX | Yes | No | No | No | No |
X | No | No | No | No | No |
The lock compatibility matrix for MGL locks
Mode | IS | IX | S | SIX | X |
IS | IS | IX | S | SIX | X |
IX | IX | IX | SIX | SIX | X |
S | S | SIX | S | SIX | X |
SIX | SIX | SIX | SIX | SIX | X |
X | X | X | X | X | X |
The lock upgrade matrix for MGL locks
We will use two granules: tables (high level granules) and records (low level granules).
When a transaction T explicitly wants to S- or X-lock a record x with a d-duration lock, it must first acquire the appropriate d-duration intention locks on a table for x. Thus, locks are acquired first on the table, then on the record. Locks are released in reverse order.
In the next sections we will see how proposed locks are acquired during row and index access.
The required locks on the row store are the following:
All locks are acquired on a per-partition basis.
First we will define a locking protocol for a hash index. Such the index can be used for speeding up equals or in queries (“hash” index type). Unique instance of this index type can be used for PK index (unique + not null).
Cursor has the only @Nullable RowId next() operation, returning the next matching row id or null if it’s not found.
The update operation for an index doesn’t make sense in the current version of locking model and is not defined.
IX lock can be used for delete, but it does not increase concurrency.
To check the uniqueness of the index, we must exclusively lock the index key and check if at least one up-to-date rowId is already mapped to this key. It has some complications due to the multiversion nature of the index - it holds multiple index keys for the same rowId, but only one rowId points to the up-to-date row for the unique index. The simplest solution is to iterate all keys until an up-to-date entry is found. This can be optimized by storing up-to-date values first (for example, by sorting it by the commit timestamp).
Assume the table Employee(rowId, name, salary) and an index on a salary.
First scenario is the simplest - inserting the conflicting values must fail:
TX1: id1 = insert [john, 100], TX1 // Insert tuple into row store
insert 100, id1 // Insert into index
commit id1, 10, TX1 // Commit data at timestamp 10
TX2: id2 = insert [bill, 100], TX2 // Insert tuple into row store
insert 100, id2 // Uniqueness violation is here.
Second scenario is more complicated and demonstrates multi-versioning issues:
TX1: id1 = insert [john, 100], TX1 // Insert tuple into row store
insert 100, id1 // Insert into index
commit id1, 10, TX1 // Commit data at timestamp 10
TX2: update id1, [john, 200], TX2 // Change salary for id1
insert 200, id1 // Insert into index, but previous index record is kept due to multiversioning!
commit id1, 20, TX2 // Commit data at timestamp 20
TX3: id2 = insert [bill, 100], TX3
insert 100, id2 // Insert into index
commit id2, 30, TX3 // Commit data at timestamp 30
TX3 will attempt to insert the existing record into the index, but it exists at timestamp 10, which is not up-to-date value. So, this insert MUST succeed.
The uniqueness for this index type is supported for an explicitly partitioned index.
The IX lock allows for concurrent insertion or deletion of different row ids for the same index key, because IX is compatible with IX. The S and X locks are not compatible with IX, so transactions are properly ordered.
Next we will define a locking protocol for a sorted secondary index. The locking model is based on a ARIES/KVL model.
The scan actually consists of multiple next() operations. First, a position in sort order is defined according to “lower” condition - a cursor seeks to this position, if it exists and below the upper bound. After that, the cursor moves over the keys until an upper bound is reached.
Consider the following index
(1, ID1)
(3, ID2),
(5, ID3)
and the scan(2, false, 4, false) operation.
A first call to next (seek operation) will position the cursor by the (3, ID2) element. A second next will return NOT_FOUND.
The main difference here with the previous index type is that we don’t know which key to lock until the next call, which will seek a key in the tree matching the lower bound.
Let’s define:
The nextKey is also defined for insert and delete operations. This is the next key in the tree after the inserted or removed key.
The next key for (3,ID2) is (5,ID3). The next key for (5,ID3) is +INF.
The locking protocol looks as follows (we assume the forward scans):
IX_commit for delete is used to allow concurrent insertions in the corresponding range.
Note that concurrent removal for the same rowId is not possible, because it’s protected by rowId lock.
S lock on x is interpreted as a range lock on (x’,x], where x’ is the largest value before the x, or -INF
For example, scan(2, false, 4, false) must take S locks on keys 3 and 5. Note that this will falsely lock the insertion of key 4, but we can’t do anything here due to the chosen conservative approach.
The validation is performed as soon as a X_commit(currentKey) is acquired. The validation logic is simple - if at least one up-to-date duplicate rowId is found, the validation must fall.
Let’s consider the previous example.
TX1: id1 = insert [john, 100], TX1 // Insert tuple into row store
insert 100, id1 // Insert into index
commit id1, 10, TX1 // Commit data at timestamp 10
TX2: update id1, [john, 200], TX2 // Change email for id1
insert 200, id1 // Insert into index, but previous index record is kept due to multiversioning!
commit id1, 20, TX2 // Commit data at timestamp 20
TX3: id2 = insert [bill, 100], TX3
insert 100, id2 // Insert into index
commit id2, 30, TX3 // Commit data at timestamp 30
TX3 will attempt to insert the existing record into the index, but it exists at timestamp 10, which is not up-to-date value. So, this insert MUST succeed.
The uniqueness for this index type is supported for explicitly partitioned hash or range indexes.
IX_commit for delete is used to allow concurrent insertions in the corresponding range.
Insert uses the weaker IX lock. This is satisfactory in preventing read, update or delete of the inserted record since IX conflicts with S and X. Since IX is compatible with IX, inserts in front of inserts are permitted. For deletes, the X next key lock prevents inserts into a delete range.
When the instant duration lock on the next key value is granted, using the return code from the lock manager, Insert checks whether the current transaction already held that lock in the X, SIX, or S mode. If the lock was held in one of those modes, then the key value being inserted must be locked in the X mode for commit duration. This we call lock state replication via next key locking. We are essentially transferring a range lock from the next key to the current key. Otherwise, a commit duration IX lock must be obtained on the key value being inserted. Note that nothing special needs to be done if the next key value is already held in the IX mode by the current transaction (i.e., the next key value is an uncommitted insert of the same transaction).
Why do we need X lock instead of IX ? The reasons are subtle. Let us consider an index with the key values 1, 2, 5, 10, 20. Consider the 3 scenarios:
S_locks are acquired for all traversed keys, even if they are for history data and not visible to a reader.
For implicitly partitioned indexes, the locking protocol must be executed on all data nodes for the table. This can lead to bad concurrency in certain cases.
The described algorithm allows concurrent inserts and deletes in non-unique indexes (assuming no concurrent reads).
It is conservative - sometimes non-related locks prevent other locks from proceeding. For example, if a record is read directly by id (e.g. where a=1) it will block insertion in the range (a’, a], but it shouldn’t. This can be addressed by using a gap locks.
Graefe’s paper suggests using varied granularity of locking modes to achieve good concurrency and low overhead. One call to a lock manager can acquire multiple locks using various lock modes. Key-value and gap locks are orthogonal. Such an approach can provide better concurrency, at the cost of a higher amount of held locks. This can be scheduled for future work.
Deadlock handling is described in the corresponding section. We can implement WOUND_WAIT and TIMEOUT_WAIT modes for starters. The second mode compliments careful cycle-free locking on a user side.
Deadlock detector seems not necessary.
A nullable unique index can have multiple NULL entries, because according to SQL standard NULL != NULL. So NULL key values are treated as for a non-unique index.
The algorithm for updating a row and its secondary indexes looks like this:
# Insert new row: (Tuple tuple, UUID txId)
The algorithm for updating a row and its secondary indexes looks like this:
# Find a row using an index (primary or secondary): (Index index, Tuple key, UUID txId)
# Update a row: (RowId rowId, Tuple tuple, UUID txId)
# Update secondary indexes: (RowId rowId, Tuple tuple, UUID txId)
Important: the delete methods are not called during the transaction execution to avoid physical deletion from the index, only respected lock are acquired/released.
When an obsolete version is removed from a version chain, a corresponding index entry must be removed as well.
Consider the previous example:
TX1: id1 = insert [john, 100], TX1 // Insert tuple into row store
insert 100, id1 // Insert into index
commit id1, 10, TX1 // Commit data at timestamp 10
TX2: update id1, [john, 200], TX2 // Change salary for id1
insert 200, id1 // Insert into index, but previous index record is kept due to multiversioning!
commit id1, 20, TX2 // Commit data at timestamp 20
If a low watermark is above 10, a version chain entry is a subject for a garbage collection. We can remove the chain entry [john, 100] and a corresponding index entry [100, id1], holding the X_lock which protects us from an incoming inserting transaction. A version chain can also be removed immediately if it has only one remaining entry - the tombstone. A GC can be performed atomically, without participating in a CC.
Some databases, for example CockroachDb, support transaction savepoints. For now, this is out of scope of this document.
Each transaction is assigned a deadline (by setting an explicit or default timeout), after which a commit is not possible. A txn coordinator watches for deadlines and preliminary aborts convicts.
RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.
Here are scenarios in which you should use a locking read-write transaction:
An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required when a version chain corresponding to rowId has an uncommitted version.
We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.
In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:
On receiving TxStateReq on the coordinator:
On receiving TxStateReq on commit partition:
Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.
The example of a write intent resolution:
There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.
Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.
It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.
RO transactions can be executed on non-primary replicas. write intent resolution doesn’t help because a write intent for a committed transaction may not be yet replicated to the replica. To mitigate this issue, it’s enough to run readIndex on each mapped partition leader, fetch the commit index and wait on a replica until it’s applied. This will guarantee that all required write intents are replicated and present locally. After that the normal write intern resolution should do the job.
There is a second option, which doesn’t require the network RTT. We can use a special low watermark timestamp (safeTs) per replication group, which corresponds to the apply index of a replicated entry, so then an apply index is advanced during the replication, then the safeTs is monotonically incremented too. The HLC used for safeTs advancing is assigned to a replicated entry in an ordered way.
Special measures are needed to periodically advance the safeTs if no updates are happening. It’s enough to use a special replication command for this purpose.
All we need during RO txn is to wait until a safeTs advances past the RO txn readTs. This is illustrated in the diagram:
Here transactions T1 and T2 updates the different key belonging to the same partition.
OpReq(w1(x)) and OpReq(w2(x)) are received concurrently. Each write intent is assigned a timestamp in a monotonic order consistent with the replication order. This can be for example done when replication entries are dequeued for processing by replication protocol (we assume entries are replicated successively.
It’s not enough only to wait for safeTs - it may never happen due to absence of activity in the partition. Consider the next diagram:
We need an additional safeTsSync command to propagate a safeTs event in case there are no updates in the partition.
However, the first approach seems easier to implement ss is illustrated by the diagrao we can stick with it from the start.
The minor issue is that the safeTs advancement consumes cluster resources even in the absence of load.
Note that we can avoid safeTs completely if a txn is considered committed then write intents replicated to all replicas.
We can define a row version lifetime in the range [now - TTL, now], where TTL is a preconfigured constant, for example 10 mins. All data beyond the low watermark are considered garbage and a subject for a garbage collection.
Active RO transactions prevent a version chain from expiration until they are finished (they “pin” it). This can be implemented using a reader ref counter.
If the RO transaction is attempted to map beyond the liveness range, it is immediately aborted. A latest committed version is never removed, even if its timestamp is beyond liveness range.
It is allowed to remove intermediate row versions from a chain, if they are not in use. For example, if we have a version chain like:
ts=40 val=4
ts=30 val=3
ts=20 val=2
ts=10 val=1
and a low watermark is reached 30, we can remove versions with ts=30 and ts=20 even if a RO tx in progress at readTs=10
TBD epochs for GC
Scans performed under RO transactions can be remapped if a mapped node has failed. For this to work, a table must track the latest scanned value for each mapped node. In case of failure, the local node iterator can be reinstated from this point on another live replica. This can help to minimize RO transactions restarts on unstable topology.
Such a transaction always returns the latest committed version. No locks are required.
There is an interesting paper which showcases tradeoffs between RO and RW transactions. It postulates the SNOW theorem, which states that a RO transaction can be S, N, O or W, but only 3 goals can be achieved simultaneously.
SNOW-optimal transaction achieves 3 of 4 goals. Our RO algorithm can achieve N-O-W, depending on data staleness. Reading more recent data can trigger additional overhead on N and O properties. For N it never blocks, but can be delayed by a network RTT, for O an additional round-trip is required for write intent resolution. In the worst case N can be delayed for commit replication time. N-O-W is achieved at a readTs <= safeTs - a timestamp, below which no active transactions exist.
For S the protocol achieved serializability. Note that RO transactions, performed at readTs, always see writes, performed by RW transactions at commitTs <= readTs.
Due to loosely synchronized clocks, RO transactions at a same absolute time will see different snapshot, in the interval defined by a clock uncertainty window.
We can improve RO transactions global visibility
Can we use the RO protocol with S2PL ? The answer seems no.
Consider the history, which is allowed by S2PL:
H = r1(x) w1(y) w2(x) c2 c1 // r1 read lock is released before c1
Assume a RO T3 and the history
H1 = r1(x) w1(y) w2(x) c2 r3(x) r3(y) c1
This history is not serializable.
However, we can improve the concurrency by allowing RO transactions to invalidate conflicting RW transactions. Currently, the design goal is to minimize the interaction, but this can be reconsidered later.
There is a special kind of transaction, which performs a lot of reads (for example full table scan) and a few writes at the end. If implemented using RW transactions, they can cause OLTP stalls. The suggested solution is to implement the reading part as RO transaction, and the writing part as RW transaction.
TBD example (referral calculation)
RO transactions are dominating in various workloads, like WWW services. They can also be used for reporting tools and analytical processing. So, they are preferred in any places where up-to-date data is not required.
The idea is very simple - a write transaction installs an update with a timestamp in the past. This update can be retrieved by a read-only transaction using local HLC. The timestamp must be less than minimum current time among all nodes, but larger than a previous committed timestamp.
The only difference with a normal rw transaction is that we need to delay the commit until all replicas receive the update. This is required for a scenario where a replica is lagging and doesn't receive log entries. This causes either
The downside is that we need to resolve write intents until the write transaction is completed, which can take some time on big topologies. This can be mitigated by installing write intents in the future and ignoring them until a current timestamp (HLC.now()) is not within clock uncertainty, in which case we must wait it out. This optimization can be postponed until later.
The lock-free read-write transactions can be used to atomically apply metadata.
For example, let’s consider schema changes for a table. All coordinators must see applied changes atomically, but we don’t want to use normal locking - this will create enormous contention.
We must associate a schema version at the start of the transaction. This can be done on the first table operation, for each enlisted table. At the end of a transaction the initial schema version is validated against the current version. If they are compatible (how exactly is defined by application logic), the txn can be committed, otherwise it is aborted.
This guarantees that a transaction will use the same schema for all its operations and it will not be committed if a schema is no longer up-to-date and incompatible.
This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.
First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.
There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.
Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.
Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.
Assume a transactions
T1: r1(x), w1(y), c1
T2: r2(y), w2(x), c2
Due to PR1 failure and a lost read lock, a following history is possible.
H = r1(x) r2(y) w2(x) c2 w1(y) c1
This history is not serializable.
We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.
To achieve this, we need to periodically refresh the lease interval for the read primary replicas if a transaction runs too long.
In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.
Leases must be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.
The reasonable refresh rate is D/2, where D is a lease duration interval.
Another issue arises due to absence of worst case time guarantees on the replication to majority. Consider the picture:
This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Luckily, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.
This means a RW transaction becomes abandoned, if it’s not started to commit. All locks and write intents created by the transaction will stay forever, if no additional measures are taken.
The one way to resolve such txns state is to use discovery service to detect coordinator failure, search locally all txn ids initiated by the failed node, group them by the commit partition node and send TxnOutcomeRequest to each primary replica, containing a list of corresponding txIds to check. The commit partition group is the ultimate source of the information about txn finish state. There are several check outcomes.
RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.
On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.
The edge case for this scenario is a full cluster restart under the load.
We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.
But it’s possible that a commit partition group primary replica fails in the middle of a process.
To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.
We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:
FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);
where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)
As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.
This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.
Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods. This section may serve as a hint for implementors.
Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.
// Inserts a row and adds it to the indexes. Returns a rowId for the inserted row. CompletableFuture<RowId> put(Tuple row, UUID txId); // Updates a row and related index entries. Returns a previous value. CompletableFuture<Tuple> update(RowId rowId, Tuple newVal, UUID txId); // Removes a row and related index entries. Returns a removed row. CompletableFuture<Tuple> remove(RowId rowId, UUID txId); // Get a tuple by rowId. CompletableFuture<Tuple> get(RowId rowId, UUID txId); // Executes the query in RW mode. AsyncCursor<RowId> query(@Nullable Query query, UUID txId); // Executes the query in RO mode. Cursor<Tuple> query(@Nullable Query query, Timestamp readTs); // Commits a transaction with a timestamp. CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs); // Aborts a transaction CompletableFuture<Boolean> abort(UUID txId);
Query implements the data retrieval logic and can use indexes. Possible types are:
The returned cursor must support async execution, because scans can acquire locks in RW mode.
Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.
The proposed operations should support batches.
Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table/index. Used by MVStore to acquire/release locks as required by CC protocol.
LockTable provides the following operations:
Lock provides the following operations:
Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.
TxnStateManager provides the following operations:
This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.
Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.
// Assign RW transaction id. CompletableFuture<UUID> beginAsync(); // Commit the transaction CompletableFuture<Boolean> commitAsync(UUID txId); // Rollback the transaction CompletableFuture<Boolean> rollbackAsync(UUID txId);
There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.
Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.
The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive, @Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap); // Other methods
Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.
Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the apply” phase of a replication.
CompletableFuture<Peer> getOrWaitLeader(); @Nullable Peer leader(); isLocal(Peer peer); List<Peer> peers(); List<Peer> learners(); <T> CompletableFuture<T> apply(Command command); boolean isLeaderLeaseValid(HLC ts);
Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic.
The requirements for correct snapshot is:
RW transactions don't require any kind of clock synchronization. So, if RO txns are not used, no kind of clock synchronization is required. We can make skew tracking optional.
Both requirements are fulfilled by running some time synchronization protocol, like NTP (which must be properly configured). But, we must be resilient to the situation when it’s malfunctioning.
We should track clock skew bounds and fail violating nodes. Nodes can periodically gossip about its physical clocks and validate its own clock for correctness. If a delta has achieved, for example 80% of maxClockSkew, the node is shut down (or warning is printed). This protects us from unbounded clock skew.
The system must be resilient to spurious clock adjustments on a single node:
But, it is not generally possible. For example, a cluster can be stopped and clocks are moved in the past. We can’t do a thing in this case (TBD probably, track the skew using CMG).
Node clock skew must be validated on join.
Let’s take a look at the proposed algorithm properties:
It looks like a well-rounded solution.
This proposal doesn't attempt to describe all possible corner cases and even may contain bugs. In the essence this is a high-level document. Many implementation details are intentionally missed and must be clarified during the development phase.
The high contention scenarios are not desirable for suggested locking based CC, but can be mitigated by using appropriate deadlock prevention strategy, like NO_WAIT.
In fact, high contention scenarios are a sign of bad application design - they doesn't scale and render the clustering idea senseless.
We can reason more precisely on the performance as soon as some benchmark results are ready. Next step, like implementing other kinds of CC, can be decided after analyzing their results.
https://lists.apache.org/thread/50g0n8tkjwr0s0vq66cd9h46z1lqj1gy
https://github.com/ascherbakoff/ai3-txn-mvp MVP for RW based CC, with data and index versioning support.