Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

The Hudi library enables to create, manage and modify tables on the DFS using an engine agnostic client library. This allows clients a lot of flexibility to manage tables by embedding this library in their user code and running as they need, based on the schedule that suits them. Since there is no “external server” component to managing these tables, it also allows for significantly less operational burden that comes with maintaining and scaling such solutions. Hudi currently supports ACID guarantees between writer and readers. Currently, Hudi supports Snapshot Isolation level and allows only a single client writer to mutate the table on the DFS. This RFC proposes to introduce the ability for concurrent writers to Hudi, allowing file level concurrency and providing Serializable Snapshot Isolation using optimistic concurrency control

Background

Motivation

Using a distributed processing engine like Spark, Hudi allows horizontal scaling features to manage different scales of workloads. Using horizontal scaling, users can provide similar runtimes and latencies of ingestion into Hudi tables even during spiky traffic. Over time, we have noticed a lot of activity and requests from users for adding multi-writer capabilities to the Hudi library, to handle cases not covered by the built-in table services. There are many examples where this is helpful. From users, we have learnt that although scaling Hudi jobs is a way to handle spiky traffic, there are some fundamental business challenges which necessitate the need for multiple writers to a Hudi table. A classic example cited is backfill. Many times, due to either changes in business requirements or data correction, pipelines require to backfill historical data into tables. As one can imagine, backfilling historical data can be orders of magnitudes higher than steady state ingestion throughput. To keep freshness of data and runtime of jobs the same, one way is to add more processing power to the same job. But there are cases where this option is not viable. First, many times this kind of auto tuning capabilities are not available to clients, thus risking latency of incoming new traffic to the table by allowing backfill data to be processed by the same job. Second, users are flexible about having different reliability guarantees on fresh, new data showing up on the table vs backfilling old data. Users would rather have high reliability and guaranteed latency for steady state new traffic and lower guarantees for backfilled data. Additionally, many times in organizations, these are completely 2 different teams, one responsible for ingesting new data and another to perform backfills/corrections of old data.

Fundamentals

Hudi currently supports a single writer model and uses MVCC for concurrently updating a table via tables services such as clustering, compaction, cleaning, thus allowing then to run asynchronously without blocking writers. Using MVCC, Hudi is able to provide Snapshot Isolation guarantees. Let's take a quick look at the different levels of isolations and their orders with respect to vulnerabilities such as dirty reads, non-repeatable reads and phantom reads. 


With Snapshot Isolation, readers are able to get repeatable reads (get the same result if queried multiple times during the same write/transaction) since each reader works on a "snapshot" of the database, aka specific version of latest committed data seen during the start of that transaction. This is possible with MVCC keeping multiple versions of data. Although, this is only true when there are no write-write conflicts also known as write skews. During a concurrent update of data; since each writer is working with its own "snapshot/version" of the data, snapshot isolation cannot guarantee that no updates are lost if each of the writers were independently modifying the same datum (record or file). Snapshot Isolation hence is vulnerable to update conflicts unless a conflict resolution strategy is applied. Update conflicts are generally solved in different ways as follows (we will not discuss READ_UNCOMMITTED since that violates basic requirements of Hudi anyways) 

  1. READ_COMMITTED isolation level uses pessimistic locks on the data being changed. 
    1. Pros
      1. Every writer always reads the latest committed data
    2. Cons
      1. Starvation since writers contend for pessimistic locks based on when the lock is released
      2. Non-Repeatable reads (same record or file read multiple times during the same write/transaction) → this is lower level of guarantee than what Hudi already provides with snapshot isolation
      3. Requires locks at record or file level to allow concurrent transactions to complete in parallel
  2. SERIALIZABLE isolation uses different techniques of achieving serializability such as 2 phase locking (2PL), timestamp ordering for conflict avoidance. Such techniques are fairly complex and probably a non-scalable, overkill for a system like Hudi. The reasons for this is out of the scope of this RFC but can be understood by doing research on the need of 2PL and how it is implemented.
    1. Pros
      1. Highest form of isolation and provides many more guarantees than update conflicts (such as no phantom reads, conflict and view serializability etc)
    2. Cons
      1. Complex to implement and is generally something that is required for OLTP type systems 
      2. Out of scope of this RFC 
  3. SERIALIZABLE_SNAPSHOT_ISOLATION is a variant of serializable isolation without some of the other guarantees provided by serializable isolation level but still providing methodologies to handle update conflicts. This isolation level uses MVCC based snapshot isolation along with a way to resolve update conflicts. There could be multiple ways to achieve this. 
    1. PESSIMISTIC LOCKING
      1. Acquire a exclusive_lock (reads & writes are blocked) for the entirety of the transaction. For Hudi this could be table level, partition level or file level - each of them have different trade-offs. Additionally, for a system like Hudi, where a writer modifies a batch of data and has no prior understanding of what that batch is, this would mean table level locks for the entirety of duration for the writer. 
        1. Pros
          1. Achieves our goal of handling update conflicts
        2. Cons
          1. May not scale depending on level of locks chosen
          2. Can eventually make all transactions serial by using table level locks for the entirety of the transaction thus turning it into a serial operation anyways. 
    2. OPTIMISITIC LOCKING
      1. Allow two concurrent transactions to proceed by reading the latest committed snapshot at the beginning of their transaction. Before committing the transactions, acquire a table level exclusive_lock (reads & writes are blocked) and perform conflict resolution to check if there are conflicting updates. If yes, allow for first transaction (may be using timestamp ordering) to continue to commit and abort the second transaction (which has to be retried). 
        1. Pros
          1. Achieves our goal of handling update conflicts
          2. Scales well since only a table level lock is needed
          3. Lock is acquired only for a small duration of time so allows transactions to run and commit in parallel if no conflicts
          4. Works well for cases where there is none to light contentions
        2. Cons
          1. Requires an external server for locks 
          2. Non performant when there is heavy contention
    3. LOCK FREE
      1. Since MVCC keeps a version of every modified data, two concurrent transactions could be allowed to proceed and commit concurrently. With all versions of data present, a conflict resolution and reconciliation mechanism can be applied later by the reader to get the latest, correct state of the datum.  
        1. Pros
          1. Achieves our goal of handling update conflicts
          2. No locks are required
          3. Scales well
        2. Cons
          1. Reader has to do additional work of reconciliation which may require additional metadata thus a possibility of increasing disk and compute cost. 

Concurrency vs Parallelism


Concurrency

Wikipedia -> “In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability property of a program, algorithm, or problem into order-independent or partially-ordered components or units.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to change the same file, they should be able to do so without affecting the final outcome. 

Parallelism

Wikipedia -> “Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism. Parallelism has long been employed in high-performance computing, but has gained broader interest due to the physical constraints preventing frequency scaling.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to mutate the table, they can change different files or partitions concurrently, as long as they are not overlapping the same resource. 

Read more here to understand concurrency control

Guarantees

Hudi uses MVCC to ensure snapshot isolation and provide ACID semantics between a single writer and multiple readers. 

In this RFC, we propose to support a feature to allow concurrent writing to a Hudi table.  The following guarantees provided by Hudi single writer model will NOT be guaranteed in multi writer with mode

  1. Integrity Constraints of unique records across multiple writers during inserts
    1. If multiple writers are writing to different files, Hudi cannot guarantee uniqueness of keys across partitions, unlike the single writer model. Ensuring unique keys is left up to the users
  2. ReadSerializability across partitions
    1. Since different writers to different files can finish at varying times, thus committing data written to files in any order, Hudi cannot guarantee read serializability.
  3. Global index support (only HbaseIndex)
    1. Since Global Index (e.g. HbaseIndex) requires a unique key across all partitions in the table, Hudi cannot support Global Index for tables requiring multi writer due to constraint (1) above. Note, GlobalSimpleIndex works fine since it first partitions based on HoodieKey and then checks the record key per partition. 

Implementation

The good news is that Hudi’s MVCC based reader/writer snapshot isolation has already laid out a great foundation for supporting multiple writers by serializable snapshot isolation. The following section describes what those features are and how we plan to alter them to support multi writer. 

Inline rollback

The current Hudi writer job supports automatic inline rollback. The rollback feature is used to clean up any failed writes that may have happened in the immediate past runs of the job. The clean up process could involve cleaning up invalid data files, index information as well as any metadata information that is used to constitute the Hudi table. Hudi does this by checking if the commit under .hoodie directory is a completed commit or an inflight commit. If it is an inflight commit, it will perform the necessary rollback before progressing to ingest the next batch of data. When there are multiple different clients attempting to write to the table, this can result in jobs rolling back each other’s inflight data leading to chaos and corruption. To ensure we support rollback with multi writer, we need a way to avoid this. To remove inline rollbacks, there are few things that need to be done 

  1. Ensure all valid data is always guarded by completed instants on the timeline. Any failed writes to data or metadata become garbage data which are innocuous and don’t affect correctness.
  2. This kind of garbage data would be lying around the DFS forever, taking up unnecessary space and contributing to file count. We need a "cleaner" process needs to clean up this data at a later instant of time (which is what the inline rollbacks were doing in the first place)

To clean up this garbage data, the problem boils down to detecting failed writers and having the ability to clean up these failed writes.

Here we propose to implement a heartbeat mechanism for every writer. Throughout the lifetime of the writer, it emits a heartbeat to the hoodie timeline to intimate any other process (here cleaner job) of it’s continued execution. A cleaner/rollback process looks at all outstanding writer requests on the timeline and checks if there are any failed writers (writers which haven’t provided a heartbeat in the allocated time range). If yes, it works on cleaning those failed writes, thus freeing up space on DFS of garbage data and removing inflight files from the timeline. This model helps to keep the timeline clean and doesn’t let failed writes to pollute the timeline over time. 

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 


Ingestion Retry

We CANNOT use the same commit time to retry a failed batch of ingestion any more. Without inline rollbacks, we are unable to remove failed writes from a previous unsuccessful job, hence every time a job tries to ingest data, it has to use a new commit time. 

Bootstrap Hudi table

Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or multi writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers. 

Updating Metadata Table

As of  RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata. We need to ensure that only a single writer updates the metadata table at any point of time. Applying the same changes twice is OK. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.

Incremental reads

With multiple writers, there is no guarantee of ordering of data since different writers can finish writing at different instants of time. Here, we have two options for incremental reads

Option 1

Allow out of order reading (not serializable)

Pros

  • This allows fresh data to be incrementally read
  • The job working on the partition might also down the road incrementally want to consume it ASAP

Cons

  • Violates read serializability currently supported by Hudi single writer model
  • Requires special a timeline API that users can use to get the last commit time to store as a checkpoint to avoid data loss of previous running writer jobs
  • May require similar changes for post commit hook which allows commit times to be published and downstream jobs to move checkpoints
  • Since we cannot move the checkpoint until all previous writers have finished, the incremental read will reprocess records until we move forward the checkpoint

Option 2

Fence till all previous commits have succeeded before allowing incremental reads

Pros 

  • Guarantees read serializability
  • Works as is, no special timeline API since checkpoints will not move forward until all commits previously have succeeding. This is the model supported for READ_OPTIMIZED VIEW incremental reads for MERGE_ON_READ tables which suffer from the same issue.

Cons

  • Incremental reads are blocked for all writers and suffer from straggler writer problems. Consider a situation where a backfill job started at time t1. This job could take many hours to finish, say at tn. In the meantime, many writers of fresh data have finished over time t2, t3, tn. With this option, we would have to wait tn-1 amount of time to get incremental reads of all new data.

Option 3

Provide incremental reads in order of writer finish time rather than commit time. 

This has the same Pros and Cons as Option 1 with added API to move checkpoints based on a timeline different from the commit timeline. Essentially, you can imagine an overlay timeline called CompletedCommitOrderedTimeline() which provides time based on completed jobs that allows checkpoints to move forward while preserving commit timeline for Hudi operations. 


In this implementation, we will allow out of order reading and leave the serialization of incremental reads to the users. As a follow up of this RFC, we will implement Option 3.

Scheduling Table Management Services

Scheduling Table Management Services such as cleaning, archiving, compaction, clustering are all done with a single writer model. The assumption is that a single controller can do these operations to ensure we can guarantee ACID between readers and writers, as well as run all operations asynchronously using the MVCC pattern. 

With multiple writers, this assumption is violated. For any such service to be scheduled, the scheduler needs to get a "static" view of the table so it can perform the necessary logic to schedule an operation. We propose to acquire a lock when scheduling such operations. During this lock, NO other writer can change the state of the table (i.e successfully commit), providing a "static" view of the table to the scheduling operation. Once the scheduling is done, the lock will be released and other concurrent writers are free to acquire the lock and perform conflict resolution. 

In a way, you can think of a Table Management Service as another concurrent writer to the table. We do acknowledge that users may require a priority ordering when choosing to resolve conflict between a less important "clustering" activity vs an important "update" activity. The framework will allow for implementing such priority ordering but the details of how the priority ordering will be implemented is out of the scope of this RFC and may be followed up in a different RFC.

Unique instant times

Option 1

Need to support providing a test-and-set kind of mechanism to provide unique instant times to multiple writers.

Pros

  • No loopholes and is guaranteed to provide unique commit times in all cases

Cons

  • Requires atomic operations that are not available on all cloud stores

Option 2

 Increasing granularity of commit times to milliseconds

Pros 

  • Works for all kinds of cloud stores

Cons

  • We can see clashes if 2 jobs started at the same milliseconds (although the probability is very low and this option is used by OLTP systems like C* to resolve latest record)
  • Requires an upgrade/migration of the timeline version that can make it slightly complicated from an operation standpoint

In this implementation, we will continue to use the SECONDS level granularity for generating hoodie commit times. In a follow up RFC, we will describe and implement Option 2.

Concurrency Control

Option 1

Optimistic Concurrency using Locking with External Server with Conflict Resolution (Multiple writers can succeed)

Step 1: Read the latest committed data from DFS or external server

Step 2: Perform write

Step 3: Acquire a table level LOCK before committing new data

Step 4: Before committing, check if the latest committed data to DFS or external server is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers, release the LOCK. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write. If there are no overlapping files, finish the commit on DFS and update max(current_commit_time, latest_commit_time_from_external_server) if applicable. 

Option 2

Optimistic Concurrency using atomic renames with Conflict Resolution

Step 1: Read the latest committed data from the hoodie timeline

Step 2: Perform write

Step 3: Acquire a lock under .hoodie/locks before committing new data. Lock IDs will be in monotonically increasing order. If there are no locks present under .hoodie/locks, the first LockID = 1. We need unique LockIDs for each transaction since we will be storing the commit time in the lock file for later purposes. Since a reader will read that file, we cannot modify the file once created. If there is already an acquired lock present, wait and retry until timeout. Create if not exists file lock1.acquired and write the commit time that acquired the lock into the file. 

Step 4: a) Ensure lock is acquired (file name is lock1.acquired) b) Compare the commit time in lock1.acquired with current commit to ensure the current job is the lock holder 

Before committing, check if the latest committed data to the timeline is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write, unlock by creating lock1.released. If there are no overlapping files, finish the commit on DFS, create lock1.released.


With all the above approaches,  there is a possibility of the job failing mid-way and not being able to release the lock. We need a feature to allow expiring these locks. Here, we will rely on the heartbeat to determine how to expire the lock. Every time a new job wants to acquire the lock, it will do 2 things a) Read the latest heartbeat and ensure the job is still alive b) Check if the lock is available (filename is lock<id>.released). If there has not been a heartbeat within the specified timeframe, the following actions will be performed to release the lock

  1. Release the lock by creating lock<id>.released

After this, the job will proceed to contend and acquire the lock as usual. Note that every any job tries to commit, it has to ensure 2 things a) It goes from inflight → commit b) It still holds the lock it had acquired.

Assumptions

  1. DFS supports atomic renames
  2. DFS supports atomic file create

Since Hudi is supported on all popular cloud stores that implement a HDFS compatible API, we cannot make any assumptions of the underlying implementations. Specifically, implementations such as S3 do NOT support atomic renames. Hence, this solution will not be useful to a population of users of Hudi. 

In this RFC, we propose to implement Option 1.


Dependency/Follow up

With multiple writers, there is no guarantee on ordered completion of commits (for conflict-free multiple writers). Incremental reads depend on monotonically increasing timestamps to ensure no insert/update is missed. This guarantee will be violated with multiple writers. To be able to use multiple writers, we will introduce a new notion to the hoodie timeline for each commit called END_COMMIT_TIME along with the START_COMMIT_TIME that is already used for performing commits. Incremental Reads will depend on END_COMMIT_TIME to tail data and also publish them to downstream systems to be able to checkpoint them.

Rollout/Adoption Plan

Once the proposed solution is implemented, users will be able to launch multiple writers.

Test Plan

1) Test the following scenario with a long running job

// Add compaction, clustering
// Check archival kicking in
// Schema Evolution ? (older commit vs new commit)
For TableTypes in (COW, MOR)
	For ConcurencyMode in (SingleWriter, OCC (with non-overlapping & overlapping file ids))
		Test 1:  EAGER cleaning, without metadata (validate no regression)
		Test 2:  EAGER cleaning, with metadata (validate no regression)
		Test 3:  EAGER cleaning, with metadata, with async cleaning (validate no regression)
		Test 4:  LAZY cleaning, without metadata
		Test 5:  LAZY cleaning, with metadata
		Test 6:  LAZY cleaning, with metadata, with async cleaning
	    Test 7:  Incremental Pull (out of order execution of commits for OCC) 


2) Add MultiWriter test cases

3) Enhance Hoodie Test Suite to test Spark Datasource with Locks


Failures & Build rollback

Here, the recommendation is to first upgrade to a reliable build that comes with the feature to turn on optimistic_concurrency_control. Once you have certified and rolled out your build in production, you can enable multiple writers by setting a few configs. 

If for some reason, there are failures, you can turn off optimistic_concurrency_control and revert to single_writer model. Any failures will be seamlessly cleaned up by the writer.

Turning on Scheduling Operations

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn on optimistic_concurrency_control for all writers.

Future Work (Lock Free-ish Concurrency Control)


As discussed before, there are 3 ways to achieve serializable snapshot isolation using multiple writers. Here, we chose to implement a lock based optimistic concurrency control mechanism that allows for concurrent writers to update the state of the table at the same time, while handling conflict resolution just before committing the transaction. Although we explained above the many technical pros and cons of using this approach, we have not discussed some of the business driven pros and cons of using optimistic locking. 

Consider this real world situation for one of the companies using Hudi. Here, there are three different writers to a Hudi table.

  1. The first writer is continuously ingesting new data. This could be change logs of Kafka logs ingested into the table. 
  2. The second writer, based on some business logic, is encrypting data written in the last X hours. 
  3. The third writer is a backfill job that is attempting to "correct" some of the data written in the last X days.


Depending on the nature of the data, when using optimistic concurrency control, there can be multiple situations for write-write conflict. If an update conflict arises, the optimistic concurrency implementation allows 1 (the transaction that is or has finished first) to succeed while aborting all other conflicting transactions. The operations done by these transactions will therefore need to be retried. Again, depending on the nature of the data, this can keep happening repeatedly not allowing some operations to ever succeed, creating the following problems for your organizations if 

  1. Violating SLA for operations - If your organization has defined SLA's of when the data should be corrected or encrypted, such conflicting writes may end up aborting those jobs and violating business SLAs
  2. Wasted resource consumption - All jobs that are aborted need to be re-tried form the beginning, thus wasting CPU and Memory cycles and increasing organizational costs
  3. Locking infrastructure bottlenecks - If you have thousands of jobs running at minute level frequencies, this can result in tens of thousands of requests to the lock provider. Not only does this require organization to manage, operate and scale the lock provider system, but it also creates a single point of failure for your state-less jobs. 
  4. Hard to enforce priority - Depending on different business SLAs, you might want to assign different priorities to different jobs based on importance. With optimistic concurrency control, priorities can only be applied when the job is about to commit since there is no previous state


Here, we'd like to pause and revisit our choice of handling update conflicts. Our isolation requirements have not changed. A LOCK_FREE snapshot isolation remains to be explored as a possible alternative to OPTIMISTIC_CONCURRENCY. As discussed before, there are 2 ways to handle update conflicts 1) Conflict avoidance - done by systems such as ones supporting serializable isolation 2) Conflict resolution - either done by locks or lock free by readers. Let's discuss the LOCK FREE conflict resolution model. 


Lets first understand the types of consistency models (ACID) that ensure correctness

  1. Strong Consistency  - In a strongly consistent system, a change applied to a set of data is ONLY applied if we can ensure that any failure will not affect the state of the system as seen by any readers of the system. In a single node system, this is straight forward to understand, but this becomes extremely tricky in a distributed system. Here, usually a 2PhaseCommit or 3PhaseCommit protocol is used to ensure the system is strongly consistent. This requires a consensus protocol such as Paxos or Raft etc that can ascertain the eligibility of all nodes in the distributed system to agree and accept the changes applied. Here, availability is traded for consistency in the CAP theorem. 
  2. Eventual Consistency - In an eventually consistent system, a changes applied to a set of data is ONLY applied if we can ensure that failures in parts of the systems are OK and readers may see different states (stale data) of the systems for a "period" of time. Usually, systems supporting this kind of consistency models use a conflict resolution methodology for example Latest Writer Wins (LWW) using timestamps as the conflict resolver or a Quorum based methodology by comparing data from a majority of the nodes. Such systems generally do NOT require any consensus based protocol and trade availability for consistency in the CAP theorem. 


All of this is applicable to us when thinking of the problem of strong and eventual consistency as analogous to update conflicts from multiple writers. To achieve LOCK FREE snapshot isolation, we need to have different updates applied to the same datum without any synchronization or consensus, here LOCKS, but finally get the correct, consistent view of the datum

Strong Eventual Consistency

Another option to achieve eventual consistency is by exploring a Conflict-Free Replicated Data Types (CRDT). In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination/consensus but with a predefined set of rules that are applied during conflict resolution at a later stage. 

This is like eventual consistency but with deterministic outcomes of concurrent updates

  1. No need for background consensus
  2. No need to rollback
  3. Available, fault-tolerant, scalable

A good example is the Last Writer Wins (LWW) methodology with timestamp based resolution used to achieve eventual consistency. One of the problems with timestamp based resolution is that all servers need to be clock synchronized using NTP and there can be no scope for any clock drift. Ensuring this is an extremely hard job and requires special systems. Note that LWW methodology assumes that the timestamp of the write is associated to the lowest granularity of the datum, in databases, this is generally a column of a row.


Since in these methodologies, we are deferring this merging to the reader (realtime view), all of the following documentation assumes the hoodie table type to be MergeOnRead tables.

CRDTs

Hudi supports partial row updates, which means 2 different writes/updates could be updating 2 different columns in the same row. Since input data to Hudi are generally coming from a different source such as Kafka which may receive out of order writes, Hudi by itself cannot assign ordered timestamps to these updates. Hence, LWW based on timestamp does not directly work here. 

If along with a timestamp based resolution, we also had a deterministic merge function that allows for conflict resolution of these 2 updates, we could possibly apply this function at a later stage and provide correct results. An implementation of such a merge function is a CRDT. 

In a CRDT data structure, updates are

  1. Commutative → Order of updates does not matter
  2. Idempotent → Applying same updates repeatedly does not matter
  3. Associative → Order of groups of updates does not matter

This merge function can be implemented by users to ensure a correct ordering of updates based on internal semantics of the data which may be timestamps or other such logic. 

To summarize, using a CRDT, we can allow multiple writers to commit concurrently without the need for a lock to perform conflict resolution while still providing ACID semantics. When readers construct the latest state of the table, a CRDT ensures the 2 different snapshots of the data get applied and a deterministic, serial order can be maintained on update conflicts

Since CRDTs have to be commutative, the order of "merge" does not matter to generate correct result after applying the rules.


NOTE : One of the problems of eventual consistency with CRDTs arises with replication across multiple nodes making it not ACID compliant. Since Hudi relies on underlying DFS based replication, this concern does not apply. 


Total Ordering and Overwrite Latest Payload 

Although CRDTs posses the above qualities, without a time based ordering, data can be committed in any order and readers can view the data in any order. This violates Hudi's current single-writer serialized snapshot isolation in which writes always happen in monotonically increasing timestamp order and that is how readers view the data. Additionally, Hudi already supports some out of the box methodologies to merge existing data with newly written data such as OverwriteLatestPayload. Since these cannot rely on the internal semantics of the data (as that is dependent on user payloads) and OverwriteLatestPayload is NOT a CRDT, we need a mechanism / rule (a CRDT of sorts ? (smile) ) to ensure these merge methodologies generate correct, expected results during concurrent writes as they would during serial writes.

Here, Hudi's commit timestamp comes to the rescue. Hudi has a _hoodie commit_time associated with every record. A default implementation of a CRDT would be to provide an ordered set of updates to a datum in order of commit times. With this, applying the merge function in this order creates a CRDT. For Hudi's default implementation, the merge function available is the overwrite_latest_payload that simply takes the latest record. 


NOTE: Supporting such kind of ordered merge might require to first sort the LogBlocks in increasing order of START_COMMIT_TIME since updates can happen out of order and then apply the OverwriteLatestPayload.


START_COMMIT_TIMESTAMP vs END_COMMIT_TIMESTAMP

Let's take the example of these overlapping transactions as below 



Consider 4 transactions T1, T2, T3, T4 starting and ending at different instants of time. Ideally, as per general transactions in a database, one would assume the following serial order for these transactions if they were to be done in a serial fashion.


This works fine for general purpose database transactions where the input (updates to rows) are already prepared before starting the transactions. In Hudi, this is not true. Due to the nature of LAZY evaluation of data (Dataframes, Datasets etc..), when the actual update is prepared can be different from when the transaction start time. Hence, ordering transactions / updates on the start_commit_timestamp may not yield correct results. 


An alternate options is to serially order them based on transactions end time (end_commit_timestamp), the serial order would look as follows 



As you can see, the T2 jumps from being the second transaction to the last one. 


Now it's hard to determine what should be the correct order to choose, start or end commit timestamp since we do not know when the input data is actually materialized. Assuming materialization of data is based on some kind of checkpoint and that the latest snapshot read for a particular transaction is based on the transactions start timestamp (since that can be decided at the beginning in the AbstractHoodieWriteClient), we choose to order transactions based on START_COMMIT_TIMESTAMP.

Note, implementing this requires changes in Hudi to ensure that the latest snapshot of the table is sealed during the start of the transaction and does not change during the course of the transaction all the way till the end. Without this change, our above assumptions are violated that will leave the table in an inconsistent state. More specifically, this requires sealing the ActiveTimeline and not reloading it during the entirety of the transaction. 


Table Services


Scheduling & Executing

Although we can solve the problems of update conflicts using CRDTs, there still exists a problem of how to co-ordinate other table services with these concurrent writes. Services such as compaction and clustering require a single writer or a table level lock to get the current state of the system before such a service is scheduled. In its current implementation, no concurrent writer can be started when scheduling these services. Let's take a look at the underlying assumptions and limitations 

  1. Cleaning - No problem since only requires the state of the table in the past
  2. Compaction
    1. Scheduling - Phantom base file is created based on the scheduled compaction instant time. If multiple writers start at the same time as scheduling, inserts and updates get written to older log files which may not get picked up by the compaction process, thus resulting in data loss.
    2. Execution - No problem if the scheduling is taken care of
  3. Clustering
    1. Execution - In its current state, clustering cannot execute in parallel to concurrent updates. If a clustering is scheduled on some file groups and updates come along for those file groups, the writes will fail until clustering finishes. When implemented, this will NOT be a problem just like compaction.
    2. Scheduling - The scheduling of clustering while concurrent writes are happening can present some race conditions leading to data loss.
  4. Restore
    1. Scheduling - N/A
    2. Execution - With out of order updates being applied, we cannot know for sure what is the correct point in time to restore the data to without compromising the "correctness" of the table. Hence, here, we leave restore as an advanced option to users who can either a) Understand the trade-off of doing restore may not result in consistent, correct data b) Manage different update processes themselves so they know what is a good point in time to restore to without compromising correctness. 


To solve the issue of scheduling table services, we have the following options


Option 1 : Brief, table level locks for all writers and table services - Writers take a table level lock for the duration of creating the .commit file under the hoodie folder. Note, this is NOT optimistic concurrency. The lock is just meant to "seal" the state of the table when committing data to it. Similarly, scheduling a table service takes a lock to scan and create the plan. 

Option 2: Use monotonic timestamps to create ordered static views of the file system


Option 2 implementation 


BASE FILE (F1_C1) + LOG_FILE (F1_L1_C1).<sequence_id> : For HDFS, we append multiple log blocks with different commit times inside the same log file. It's then hard to know which commits have gone into the LOG file just by the name. Instead, if we could change adopt the following changes

  1. No more appends - This is something common for all cloud stores anyways and we should just force this on all DFSs
  2. Rename log file naming format - (F1_L1_C1).<sequence_id>, change the sequence_id to the actual commit time that's adding that log file, something like (F1_L1_C1).<commit_timestamp>


Now, consider the following sequence of operations without any LOCKS.

C1 C2 C3.inflight C4 C5.inflight 


Now,  a compaction is scheduled. 

C1 C2 C3.inflight C4 C5.inflight  COMPACTION.schedule.C6 C7.inflight

We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit. 

Versioning (Cleaning)


With log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. We cannot remove or clean older versions of data until we know it is "safe" to do so. This safety depends on whether the latest, correct update version has been recorded in the log file. 

Additionally, Hudi already only supports "cleaning" for MergeOnRead tables at FileSlice levels, any new updates going to log files cannot be cleaned individually, but only at the file slice level. With CRDTs, same will apply.


Incremental Pulls


Incremental pull allows to pull changes happening on a dataset. Without multiple writers, there is a monotonically increasing timestamp against which incremental pull can extract the changes since the last timestamp. With multiple writers, as pointed above in optimistic concurrency implementation, this is violated. Another important thing to note is that incremental pull currently provides the latest snapshot of a record by merging all intermediate changes that happened from time t1 to t2. 

For CRDTs based concurrency management, with updates applied out of order, we want to generate the incremental pull snapshot by applying the same rules of CRDT so that no updates are lost for downstream consumers and they always see consistent state of the data. NOTE, for the OverwriteWithLatestRecord implementation is not a CRDT and special handling is needed for this as pointed below. 

This would mean that Spark, Presto and Hive MOR incremental pull have to implement the same semantics of ordering updates like in a RealtimeView and in compaction. 


NOTE that incremental pull based on END_COMMIT_TIME is still required for COPY_ON_WRITE tables using optimistic concurrency control management.


Overhead of MergeOnRead (context for advanced readers)

COW vs MOR



Let's dive deeper into the merge cost of MergeOnRead tables. 


COW read path

  • Vectorized retrieval of latest value of columns
  • Predicate pushdown to parquet reader
  • No need to perform "merge" operation
  • No materialization of columns to rows
  • ONE cost for serialization and deserialization
    • Parquet to InternalRow for Spark
    • Parquet to ColumnRow for Presto
    • Parquet to ArrayWriteable for Hive


MOR read path


  • NO Vectorized retrieval of latest value of columns, will be slower and result in more CPU time for queries
  • Need to perform "merge" operation, requires Memory and CPU to serde, hash lookups etc
  • Materialization of columns to rows (whole row is required), increases Memory and I/O cost of how much data is read from disk
  • THREE costs for serialization and deserialization
    • Parquet to GenericRecord
    • LogBlock Avro bytes to GenericRecord
    • GenericRecord to InternalRow, ColumnRow, ArrayWriteable for individual query engines
  • No Predicate Pushdown even with parquet base files since reading the full row is required to merge contents


9 Comments

  1. Good write up Nishith. Few clarifications/thoughts.

    • IIUC, inserts across two writers going to same partition cannot be resolved unless each writer goes through inserts from the other writer and this design does not cover this case. If two concurrent writers only have updates/deletes, then yeah we can call it as file level concurrency, but what we are proposing here IMO is partition level concurrency/isolation. Would be good to call it out explicitly. Correct me if my understanding is wrong.
    • Can we call out somewhere that point in time queries may return different results even w/ same timestamp depending on completion of pending writes.
    • If there are two concurrent writers, c1 and c2, and c1 couldn't get resources where as c2 completed. Can you confirm that either c1 sees all data from c2 or none. We don't want a scenario where c1 sees c2 data for some partitions alone, but not for others.
    • Can you go over how does source ordering field will pan out wrt concurrent writers.
    • Can you help me understand what happens in this case(related to source ordering field). Lets say we have two concurrent writers, one of which is deleting rec_1 and c2 is inserting rec_1 again. If done sequentially, if queried between c1 and c2, rec_1 will show as deleted. If queried after c2 completes, rec_1 will be returned as an active record. So, with concurrent writing, lets say c2 committed ahead of c1. Remember source ordering field of c2 > c1 for rec_1. So, when queried after both got committed (base file + delta_c2 + delta_c1 ), will rec_1 be considered delete since thats the last committed file or do we honor source ordering field and will return as an active record from c2's commit? Do we need any fixes on this end, bcoz from the 2 diffs you have put up, I don't see you any changes on this regard. 

    A food for thought.
    Usually not a big fan of locking/doing blocking operations unless otherwise required. What I am trying to tackle here is, if two concurrent commits overlap, then one has to wait for other to complete leading to a blocked wait, even if sufficient resources are available. So, trying to think if we can do it async. A rudimentary idea. Def need to think through in detail, but just putting it out to hear your thoughts.

    If we can have a async callback for commits which will be called at a later time when commit is completed / failed. Or in other words, hudi will optimistically commit data, but will not open up the data for queries until certain conditions are met.

    What I am trying to get to is, our data available for consumption will ensure serializability. i.e. If C_n is available for queries, all C_n-1 will also be available. If some C_n-x is still in progress, then C_n will not be available to be queried.

    For eg:
    Lets say, C1, C2, C3, C4 are triggered concurrently.
    C1 gets committed and will be immediately available for queries.
    Lets say C3 gets committed next. But will not be exposed to queries yet, since C2 is not yet complete.
    Next when C2 gets committed, it unblocks both C2 and C3 for queries. At this juncture, all C1, C2 and C3 are available to be queried.
    Next when C4 is committed, all 4 commits are exposed to queried.

    Basically every commit will go through all commits which are committed but not yet opened up for queries, and open them up if all C_n-1 are completed.

    If users aren't interested in callback, we could simplify further as we don't need to keep holding on to those jobs.

    This gives easier reasoning about serializability from consumption standpoint due to the complications with commits getting exposed in different order, source ordering complications etc.

  2. In general, I am bit concerned about exposing individual commits w/o regards to causality. Not sure if we need to let users worry about this or should Hudi take an informed approach and say that hudi will expose commits w/ some ordering guarantees. Or atleast give both options to users and let them choose which one best suits them. Open to hear your thoughts. Vinoth Chandar Nishith Agarwal Balaji Varadarajan

    I am sure you would have thought more on this. would be great to hear your thoughts. 


  3. Nishith Agarwal sivabalan narayanan connected offline. Most of the above questions are already answered in this RFC, for others the RFC has been updated to reflect the proposals. 



  4. sivabalan narayanan If we can read up something like this and frame our discussion around the models listed here, https://jepsen.io/consistency , it will help shape what we can do. 


    > Basically every commit will go through all commits which are committed but not yet opened up for queries, and open them up if all C_n-1 are completed.

    So, this approach of opening up commits, as "holes" go away in the timeline is fair, but will lead to large delays in publishing of data. Going back to my point above, if providing `SERIALIZABLE`  transactions is the goal, then we cannot have more than one commit succeeding i.e there is exactly one commit, either c1,c2,c3,c4 that succeeds in taking the table from state represented by C0 forward. Others should fail and retry.

    That said, I also don't think its the right thing in most cases. Employing optimistic concurrency control with large/long running transactions is a bad idea in general, due to wastage of the resources used by the failing writers. 

    I need to flesh this out a bit more, but I think we should add a "end_instant_time" notion to the timeline and provide ordering guarantees based on end time, and not commit start time. With this model, we also have the ability to construct a timeline with all conflicting writes, and we already have a mechanism like `HoodieRecordPayload` which can help us implement a conflict resolution scheme. This approach is much more scalable and close to event streaming systems, rather than transactional databases. (which again support much smaller transactions typically. Ask any DBA how hard it is to support long running transactions on a production db).



  5. Nishith Agarwalmay be we can regig the content a bit, based on the locking approach we are currently pursuing and we can flesh out a design for the log based concurrency control, which can go out in 0.9.0 or something. This is going to take some actual research and validation. 


  6. On tests, we can start with MOR : lazy cleaning of failed writes, OCC, run it until enough archival happens, with async cleaning, async compaction, async clustering

    And lets please test this on the data frame level. 

  7. Vinoth ChandarI have updated this RFC to reflect implementation and also added diagrams, ptal

  8. Nishith Agarwal Nice re-write of the RFC!


    There are several open design items for the lock free-ish CC.  But, it seems like only way to deal with this is to have some sense of a transition time to the timeline. We could go ahead with 

    https://github.com/apache/hudi/pull/2580 in the meantime.