Consider table's schema being altered just before the data transaction has started. Some nodes might have already applied the new table definition, while others not. As a result, when the transaction gets committed, it might get committed with different versions of the table schema on different nodes (or on some nodes the table might have been already considered dropped, while on others not), which breaks consistency.

To ensure consistency, we must use the same version of the table schema when committing a transaction across all participant nodes.

Additionally, it is imperative that a client is able to "view their own schema updates." In the event that a client initiates a schema update and receives confirmation that the update has been implemented, they must observe that the update has taken effect on any node they access.


A schema update U for an object Q looks like "since the moment Tu, object Q has schema S". Tu is the activation moment of the new schema version S; in other words, S activates at Tu. Schema version S is effective from its activation moment Tu till the activation moment Tu of the next schema version S’ of the same object (including the start, excluding the end of the interval) or indefinitely if S is the most recent version of the schema at the moment; so it’s either [Tu, Tu’) or [Tu, +inf).

A schema update U is visible to a node N at moment Top if the node N got a message M containing this update at a moment T<=Top. (Schema update messages are distributed using Meta Storage RAFT group (so a message consumption is an application of it to a state machine), so the update messages are delivered in order (hence, if a message is not delivered, none of the messages that follow it will be delivered) and not duplicated)). This is due to the fact that there is exactly one Meta Storage RAFT state machine at each node.

We want that for any given timestamp Tq, all nodes get the same answer to the question: ‘whether a schema update U is visible and effective or not at the moment Tq.

This can be reformulated: a schema update saying that a new schema version activates at Tu splits all transactions into two sets: those that commit strictly before Tu (Tc < Tu) and hence operate on the old schema version, and those that commit starting with Tu (Tc Tu) and hence operate on the new schema version, regardless of the nodes that participate in the corresponding transaction.

An important thing here is the clock we use to define the timestamps. 

As the nodes’ clocks do not have to be synchronized, it’s the node’s local resolution of "whether a timestamp is reached by the node’s clock or not" that matters, it’s not relative to some absolute clock (about which each node only has an approximate idea).

Let’s call the property defined above the Atomicity Property (AP).

The following property makes AP to hold: no node can get a message about an update U(Tu,S) later than the node has fetched (and used in handling some transaction) a schema for a moment Top >= Tu.

To maintain the real-time atomicity, we would need to coordinate between all nodes of the cluster on each schema update to make sure they get the update before they can commit a transaction using it, so a lagging node would make all nodes wait (not processing any transactions); alternatively, if a schema update might not be acknowledged by all nodes in a timely fashion, a schema update could be held in an inactive state for an indeterminate period of time creating a form of unavailability of the whole cluster.

That's why we separate the timestamps assigned to the events (like schema version activation and schema version fetches) from the actual moments when the corresponding events tagged with these timestamps are processed on each node. We have:

  1. A monotonic clock from which we take timestamps used to tag events. The timestamps are totally ordered. These are reference timestamps on a reference timeline.
  2. Events tagged with these timestamps (hence, also totally ordered)
  3. On each node, these events are ‘executed’ or ‘applied’ in the node timeline in the same total order, but this execution/application might happen with some lag after the events’ timestamps were taken from the clock. Distances between application moments of the corresponding events do not have to be equal (or proportional to) the distances between the corresponding timestamps labelling the events: only the ordering matters.
  4. In real time, the application of events might happen in different moments of time on different nodes.

Schema change flow

  1. Client issues a statement that modifies database schema (CREATE/ALTER/DROP TABLE, CREATE/DROP INDEX, etc)
  2. The new schema version (with its Tu) is propagated to all cluster nodes using the Meta Storage (as we have a Meta Storage RAFT group member, a learner or not, on every cluster node)
  3. Each node acts independently from other nodes in interpreting the Tu, so no cluster-wide coordination between nodes is needed when partitioning the transactions. Schema version activation happens "automatically" when the node is sure it has all the updates it needs for processing the transaction it is processing. To make sure it has all the updates, a node might need to wait.
  4. Before an update is reported as installed to the client, an additional wait happens to make sure that the client’s request on any node will see the installed update after the installation call returns. The required wait is until the schema update activation becomes non-future on all nodes (taking clock skew into account, so the client must wait for at least Now ≥ Tu+CSmax).

A simplified diagram demonstrating the flow of information on a schema update (low-level details (timestamp arithmetics for optimization) are not shown):

Making sure a node does not miss any schema messages that are relevant to a reference timestamp

The clocks may be skewed and the network is asynchronous (so message delivery might take an arbitrarily long time). This means that we cannot just use the naive approach of installing the update to the future and processing transactions on the nodes right away.

Briefly, the idea is to do the following:

  1. Use the fact that Meta Storage is available on all cluster nodes (thanks to Learners) to distribute the schema updates
  2. Wait for Meta Storage safeTime on a node to reach the required value to make sure the node sees all the updates it needs to process a transaction at question
  3. Actually wait for a safeTime Delay Duration (DD) earlier than needed (to avoid waiting in most of the cases), compensate this by installing an update after DD in the future to make the timestamp math easier on the nodes

Using safe time

We must prevent a node processing a transaction at moment Now before the node has received all the schema updates with activation moment not later than Now. We are going to use Meta Storage for storing the schema updates (note that this implies that we use HLC from now on, not the physical clock that we used in the warm-up section). We are going to read schema updates from the Meta Storage on other nodes. To make sure we never read stale data (breaking AP), we could use the same approach that we use to make reads (in RO) from non-primary replicas: Safe Time.

Message M saying ‘Starting at moment Tu, schema is S’ is sent with timestamp Tm (that moves safeTime forward). A schema storage client that wants to obtain a schema active at moment Top waits for Meta Storage’s safeTime>=Top. Tu must be ≥ Tm (because we’ll construct messages in this way), so, if Tm<Top, after waiting, the client sees M (as Tm<Top<=safeTime, so Tm<safeTime, hence M is visible to the client’s node) and knows that S has been activated at Tu before it obtains schema for moment Top.

This means that AP is maintained. Let’s suppose the opposite. This means that a client obtained a schema for some moment Top>=Tu>=Tm while it was not knowing about M. But as the client had to wait for safeTime>=Top+CSmax, Tm must have become ≤ than safeTime (even in the presence of clock skew), so the client must have seen message M. We have a contradiction.

To maintain the ‘client sees his own schema changes’ property, we must make the client wait for CSmax after installing an update to make sure that Tu has become non-future on any node, so if the client issues a request to any node, they will see their update.

This approach guarantees correctness (AP is always maintained), but every node (that is not the Meta Storage leader), while trying to obtain a schema for the moment Top=Now, will have to wait for safeTime visible on this node to reach Top, which will take PD (propagation duration). So each transaction will be delayed for PD (on average). This creates a performance problem: latency increases.

Waiting for safe time in the past

Let’s make a couple of modifications.

  1. Choose some delay duration. DD should be >= typical PD (so that we account for the network latency) and ≥ CSmax (so that clock skew does not make us wait).
  2. When creating a message M telling the cluster about a schema update activation moment, choose the message timestamp Tm (moving safeTime forward) equal to Now, but assign Tu (activation moment) contained in that M to be Tm+DD
  3. If a node wants to read the schema at Top=Now, it will wait for safeTime≥Top-DD instead of Top and then it will take a schema at moment Top. Most of the time, it will not need to wait, or the wait will be a lot shorter. (Please note that the subtraction only affects the safeTime we are waiting for, not the activation timestamp we look up after having waited; this makes it easier to reason about activation timestamps).

The downside is that schema change becomes visible only after some time. Even if a message is delivered faster than DD (or we are on the leader), we will still not see the schema until DD passes. We can alleviate this by returning the control to the user after additional time (DD) after the schema change is applied by the user, so the user that created the schema update will not be able to see the system in an inconsistent state (that is, not being able to see the change they have just introduced).

Here is a diagram that exemplifies the mechanism with the cases of 2 nodes: one healthy (that does not need to wait), another lagging (that has to wait).

Risks and Assumptions

Schema updates are not visible right away

Even on the Meta Storage leader, a just-installed schema update is not visible right away, because it activates after DD in future. This might be confusing for a user that just issues an ALTER TABLE query because ‘read own writes’ is violated. This can be alleviated by making sure that we complete the future used to install the schema update not later than DD passes, but if a client fails before the wait elapses and reconnects, they might expect to see the update.

A node not receiving Meta Storage updates freezes processing of transactions where the node participate

A primary replica needs to wait for safeTime to reach Top-DD before processing a transaction at a reference timestamp Top. If the node does not get any Meta Storage updates (i.e. it lags behind the Meta Storage leader), it cannot proceed.

Possible situations:

  1. The primary replica actually lags behind the whole cluster. In such a situation, we need to promote another replica to become a primary. This case is not caused by the new mechanism introduced here, we already should have measures to fight this.
  2. The whole cluster lags behind the Meta Storage leader. This means that the whole cluster is in an uncontrollable state. In such a situation, we become unable to process transactions.

Note that if a primary P lags, then transactions where P does not participate are not affected.

An unpleasant thing is that a node lagging too much behind the Meta Storage freezes transaction processing even in the absence of any schema updates.

Discussion Links

// N/A

Reference Links

// N/A


Key Summary T Created Updated Assignee Reporter P Status Resolution

  • No labels