Current state:
Current State | |
---|---|
Discussion thread: here
JIRA: here
Released: <Hudi Version>
The Hudi library enables to create, manage and modify tables on the DFS using an engine agnostic client library. This allows clients a lot of flexibility to manage tables by embedding this library in their user code and running as they need, based on the schedule that suits them. Since there is no “external server” component to managing these tables, it also allows for significantly less operational burden that comes with maintaining and scaling such solutions. Hudi currently supports ACID guarantees between writer and readers. Currently, Hudi supports Snapshot Isolation level and allows only a single client writer to mutate the table on the DFS. This RFC proposes to introduce the ability for concurrent writers to Hudi, allowing file level concurrency and providing Serializable Snapshot Isolation using optimistic concurrency control.
Using a distributed processing engine like Spark, Hudi allows horizontal scaling features to manage different scales of workloads. Using horizontal scaling, users can provide similar runtimes and latencies of ingestion into Hudi tables even during spiky traffic. Over time, we have noticed a lot of activity and requests from users for adding multi-writer capabilities to the Hudi library, to handle cases not covered by the built-in table services. There are many examples where this is helpful. From users, we have learnt that although scaling Hudi jobs is a way to handle spiky traffic, there are some fundamental business challenges which necessitate the need for multiple writers to a Hudi table. A classic example cited is backfill. Many times, due to either changes in business requirements or data correction, pipelines require to backfill historical data into tables. As one can imagine, backfilling historical data can be orders of magnitudes higher than steady state ingestion throughput. To keep freshness of data and runtime of jobs the same, one way is to add more processing power to the same job. But there are cases where this option is not viable. First, many times this kind of auto tuning capabilities are not available to clients, thus risking latency of incoming new traffic to the table by allowing backfill data to be processed by the same job. Second, users are flexible about having different reliability guarantees on fresh, new data showing up on the table vs backfilling old data. Users would rather have high reliability and guaranteed latency for steady state new traffic and lower guarantees for backfilled data. Additionally, many times in organizations, these are completely 2 different teams, one responsible for ingesting new data and another to perform backfills/corrections of old data.
Hudi currently supports a single writer model and uses MVCC for concurrently updating a table via tables services such as clustering, compaction, cleaning, thus allowing then to run asynchronously without blocking writers. Using MVCC, Hudi is able to provide Snapshot Isolation guarantees. Let's take a quick look at the different levels of isolations and their orders with respect to vulnerabilities such as dirty reads, non-repeatable reads and phantom reads.
With Snapshot Isolation, readers are able to get repeatable reads (get the same result if queried multiple times during the same write/transaction) since each reader works on a "snapshot" of the database, aka specific version of latest committed data seen during the start of that transaction. This is possible with MVCC keeping multiple versions of data. Although, this is only true when there are no write-write conflicts also known as write skews. During a concurrent update of data; since each writer is working with its own "snapshot/version" of the data, snapshot isolation cannot guarantee that no updates are lost if each of the writers were independently modifying the same datum (record or file). Snapshot Isolation hence is vulnerable to update conflicts unless a conflict resolution strategy is applied. Update conflicts are generally solved in different ways as follows (we will not discuss READ_UNCOMMITTED since that violates basic requirements of Hudi anyways)
Wikipedia -> “In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability property of a program, algorithm, or problem into order-independent or partially-ordered components or units.”
In Hudi’s context, if resource = file, if there are multiple different writers attempting to change the same file, they should be able to do so without affecting the final outcome.
Wikipedia -> “Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism. Parallelism has long been employed in high-performance computing, but has gained broader interest due to the physical constraints preventing frequency scaling.”
In Hudi’s context, if resource = file, if there are multiple different writers attempting to mutate the table, they can change different files or partitions concurrently, as long as they are not overlapping the same resource.
Read more here to understand concurrency control
Hudi uses MVCC to ensure snapshot isolation and provide ACID semantics between a single writer and multiple readers.
In this RFC, we propose to support a feature to allow concurrent writing to a Hudi table. The following guarantees provided by Hudi single writer model will NOT be guaranteed in multi writer with mode
The good news is that Hudi’s MVCC based reader/writer snapshot isolation has already laid out a great foundation for supporting multiple writers by serializable snapshot isolation. The following section describes what those features are and how we plan to alter them to support multi writer.
The current Hudi writer job supports automatic inline rollback. The rollback feature is used to clean up any failed writes that may have happened in the immediate past runs of the job. The clean up process could involve cleaning up invalid data files, index information as well as any metadata information that is used to constitute the Hudi table. Hudi does this by checking if the commit under .hoodie directory is a completed commit or an inflight commit. If it is an inflight commit, it will perform the necessary rollback before progressing to ingest the next batch of data. When there are multiple different clients attempting to write to the table, this can result in jobs rolling back each other’s inflight data leading to chaos and corruption. To ensure we support rollback with multi writer, we need a way to avoid this. To remove inline rollbacks, there are few things that need to be done
To clean up this garbage data, the problem boils down to detecting failed writers and having the ability to clean up these failed writes.
Here we propose to implement a heartbeat mechanism for every writer. Throughout the lifetime of the writer, it emits a heartbeat to the hoodie timeline to intimate any other process (here cleaner job) of it’s continued execution. A cleaner/rollback process looks at all outstanding writer requests on the timeline and checks if there are any failed writers (writers which haven’t provided a heartbeat in the allocated time range). If yes, it works on cleaning those failed writes, thus freeing up space on DFS of garbage data and removing inflight files from the timeline. This model helps to keep the timeline clean and doesn’t let failed writes to pollute the timeline over time.
Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers.
We CANNOT use the same commit time to retry a failed batch of ingestion any more. Without inline rollbacks, we are unable to remove failed writes from a previous unsuccessful job, hence every time a job tries to ingest data, it has to use a new commit time.
Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or multi writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers.
As of RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata. We need to ensure that only a single writer updates the metadata table at any point of time. Applying the same changes twice is OK. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.
With multiple writers, there is no guarantee of ordering of data since different writers can finish writing at different instants of time. Here, we have two options for incremental reads
Pros
Cons
Pros
Cons
This has the same Pros and Cons as Option 1 with added API to move checkpoints based on a timeline different from the commit timeline. Essentially, you can imagine an overlay timeline called CompletedCommitOrderedTimeline() which provides time based on completed jobs that allows checkpoints to move forward while preserving commit timeline for Hudi operations.
In this implementation, we will allow out of order reading and leave the serialization of incremental reads to the users. As a follow up of this RFC, we will implement Option 3.
Scheduling Table Management Services such as cleaning, archiving, compaction, clustering are all done with a single writer model. The assumption is that a single controller can do these operations to ensure we can guarantee ACID between readers and writers, as well as run all operations asynchronously using the MVCC pattern.
With multiple writers, this assumption is violated. For any such service to be scheduled, the scheduler needs to get a "static" view of the table so it can perform the necessary logic to schedule an operation. We propose to acquire a lock when scheduling such operations. During this lock, NO other writer can change the state of the table (i.e successfully commit), providing a "static" view of the table to the scheduling operation. Once the scheduling is done, the lock will be released and other concurrent writers are free to acquire the lock and perform conflict resolution.
In a way, you can think of a Table Management Service as another concurrent writer to the table. We do acknowledge that users may require a priority ordering when choosing to resolve conflict between a less important "clustering" activity vs an important "update" activity. The framework will allow for implementing such priority ordering but the details of how the priority ordering will be implemented is out of the scope of this RFC and may be followed up in a different RFC.
Need to support providing a test-and-set kind of mechanism to provide unique instant times to multiple writers.
Pros
Cons
Increasing granularity of commit times to milliseconds
Pros
Cons
Step 1: Read the latest committed data from DFS or external server
Step 2: Perform write
Step 3: Acquire a table level LOCK before committing new data
Step 4: Before committing, check if the latest committed data to DFS or external server is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers, release the LOCK. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write. If there are no overlapping files, finish the commit on DFS and update max(current_commit_time, latest_commit_time_from_external_server) if applicable.
Step 1: Read the latest committed data from the hoodie timeline
Step 2: Perform write
Step 3: Acquire a lock under .hoodie/locks before committing new data. Lock IDs will be in monotonically increasing order. If there are no locks present under .hoodie/locks, the first LockID = 1. We need unique LockIDs for each transaction since we will be storing the commit time in the lock file for later purposes. Since a reader will read that file, we cannot modify the file once created. If there is already an acquired lock present, wait and retry until timeout. Create if not exists file lock1.acquired and write the commit time that acquired the lock into the file.
Step 4: a) Ensure lock is acquired (file name is lock1.acquired) b) Compare the commit time in lock1.acquired with current commit to ensure the current job is the lock holder
Before committing, check if the latest committed data to the timeline is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write, unlock by creating lock1.released. If there are no overlapping files, finish the commit on DFS, create lock1.released.
With all the above approaches, there is a possibility of the job failing mid-way and not being able to release the lock. We need a feature to allow expiring these locks. Here, we will rely on the heartbeat to determine how to expire the lock. Every time a new job wants to acquire the lock, it will do 2 things a) Read the latest heartbeat and ensure the job is still alive b) Check if the lock is available (filename is lock<id>.released). If there has not been a heartbeat within the specified timeframe, the following actions will be performed to release the lock
After this, the job will proceed to contend and acquire the lock as usual. Note that every any job tries to commit, it has to ensure 2 things a) It goes from inflight → commit b) It still holds the lock it had acquired.
Assumptions
Since Hudi is supported on all popular cloud stores that implement a HDFS compatible API, we cannot make any assumptions of the underlying implementations. Specifically, implementations such as S3 do NOT support atomic renames. Hence, this solution will not be useful to a population of users of Hudi.
In this RFC, we propose to implement Option 1.
With multiple writers, there is no guarantee on ordered completion of commits (for conflict-free multiple writers). Incremental reads depend on monotonically increasing timestamps to ensure no insert/update is missed. This guarantee will be violated with multiple writers. To be able to use multiple writers, we will introduce a new notion to the hoodie timeline for each commit called END_COMMIT_TIME along with the START_COMMIT_TIME that is already used for performing commits. Incremental Reads will depend on END_COMMIT_TIME to tail data and also publish them to downstream systems to be able to checkpoint them.
Once the proposed solution is implemented, users will be able to launch multiple writers.
1) Test the following scenario with a long running job // Add compaction, clustering // Check archival kicking in // Schema Evolution ? (older commit vs new commit) For TableTypes in (COW, MOR) For ConcurencyMode in (SingleWriter, OCC (with non-overlapping & overlapping file ids)) Test 1: EAGER cleaning, without metadata (validate no regression) Test 2: EAGER cleaning, with metadata (validate no regression) Test 3: EAGER cleaning, with metadata, with async cleaning (validate no regression) Test 4: LAZY cleaning, without metadata Test 5: LAZY cleaning, with metadata Test 6: LAZY cleaning, with metadata, with async cleaning Test 7: Incremental Pull (out of order execution of commits for OCC) 2) Add MultiWriter test cases 3) Enhance Hoodie Test Suite to test Spark Datasource with Locks |
Here, the recommendation is to first upgrade to a reliable build that comes with the feature to turn on optimistic_concurrency_control. Once you have certified and rolled out your build in production, you can enable multiple writers by setting a few configs.
If for some reason, there are failures, you can turn off optimistic_concurrency_control and revert to single_writer model. Any failures will be seamlessly cleaned up by the writer.
By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn on optimistic_concurrency_control for all writers.
As discussed before, there are 3 ways to achieve serializable snapshot isolation using multiple writers. Here, we chose to implement a lock based optimistic concurrency control mechanism that allows for concurrent writers to update the state of the table at the same time, while handling conflict resolution just before committing the transaction. Although we explained above the many technical pros and cons of using this approach, we have not discussed some of the business driven pros and cons of using optimistic locking.
Consider this real world situation for one of the companies using Hudi. Here, there are three different writers to a Hudi table.
Depending on the nature of the data, when using optimistic concurrency control, there can be multiple situations for write-write conflict. If an update conflict arises, the optimistic concurrency implementation allows 1 (the transaction that is or has finished first) to succeed while aborting all other conflicting transactions. The operations done by these transactions will therefore need to be retried. Again, depending on the nature of the data, this can keep happening repeatedly not allowing some operations to ever succeed, creating the following problems for your organizations if
Here, we'd like to pause and revisit our choice of handling update conflicts. Our isolation requirements have not changed. A LOCK_FREE snapshot isolation remains to be explored as a possible alternative to OPTIMISTIC_CONCURRENCY. As discussed before, there are 2 ways to handle update conflicts 1) Conflict avoidance - done by systems such as ones supporting serializable isolation 2) Conflict resolution - either done by locks or lock free by readers. Let's discuss the LOCK FREE conflict resolution model.
Lets first understand the types of consistency models (ACID) that ensure correctness
All of this is applicable to us when thinking of the problem of strong and eventual consistency as analogous to update conflicts from multiple writers. To achieve LOCK FREE snapshot isolation, we need to have different updates applied to the same datum without any synchronization or consensus, here LOCKS, but finally get the correct, consistent view of the datum
Another option to achieve eventual consistency is by exploring a Conflict-Free Replicated Data Types (CRDT). In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination/consensus but with a predefined set of rules that are applied during conflict resolution at a later stage.
This is like eventual consistency but with deterministic outcomes of concurrent updates
A good example is the Last Writer Wins (LWW) methodology with timestamp based resolution used to achieve eventual consistency. One of the problems with timestamp based resolution is that all servers need to be clock synchronized using NTP and there can be no scope for any clock drift. Ensuring this is an extremely hard job and requires special systems. Note that LWW methodology assumes that the timestamp of the write is associated to the lowest granularity of the datum, in databases, this is generally a column of a row.
Since in these methodologies, we are deferring this merging to the reader (realtime view), all of the following documentation assumes the hoodie table type to be MergeOnRead tables.
Hudi supports partial row updates, which means 2 different writes/updates could be updating 2 different columns in the same row. Since input data to Hudi are generally coming from a different source such as Kafka which may receive out of order writes, Hudi by itself cannot assign ordered timestamps to these updates. Hence, LWW based on timestamp does not directly work here.
If along with a timestamp based resolution, we also had a deterministic merge function that allows for conflict resolution of these 2 updates, we could possibly apply this function at a later stage and provide correct results. An implementation of such a merge function is a CRDT.
In a CRDT data structure, updates are
This merge function can be implemented by users to ensure a correct ordering of updates based on internal semantics of the data which may be timestamps or other such logic.
To summarize, using a CRDT, we can allow multiple writers to commit concurrently without the need for a lock to perform conflict resolution while still providing ACID semantics. When readers construct the latest state of the table, a CRDT ensures the 2 different snapshots of the data get applied and a deterministic, serial order can be maintained on update conflicts.
Since CRDTs have to be commutative, the order of "merge" does not matter to generate correct result after applying the rules.
NOTE : One of the problems of eventual consistency with CRDTs arises with replication across multiple nodes making it not ACID compliant. Since Hudi relies on underlying DFS based replication, this concern does not apply.
Although CRDTs posses the above qualities, without a time based ordering, data can be committed in any order and readers can view the data in any order. This violates Hudi's current single-writer serialized snapshot isolation in which writes always happen in monotonically increasing timestamp order and that is how readers view the data. Additionally, Hudi already supports some out of the box methodologies to merge existing data with newly written data such as OverwriteLatestPayload. Since these cannot rely on the internal semantics of the data (as that is dependent on user payloads) and OverwriteLatestPayload is NOT a CRDT, we need a mechanism / rule (a CRDT of sorts ? ) to ensure these merge methodologies generate correct, expected results during concurrent writes as they would during serial writes.
Here, Hudi's commit timestamp comes to the rescue. Hudi has a _hoodie commit_time associated with every record. A default implementation of a CRDT would be to provide an ordered set of updates to a datum in order of commit times. With this, applying the merge function in this order creates a CRDT. For Hudi's default implementation, the merge function available is the overwrite_latest_payload that simply takes the latest record.
NOTE: Supporting such kind of ordered merge might require to first sort the LogBlocks in increasing order of START_COMMIT_TIME since updates can happen out of order and then apply the OverwriteLatestPayload.
START_COMMIT_TIMESTAMP vs END_COMMIT_TIMESTAMP
Let's take the example of these overlapping transactions as below
Consider 4 transactions T1, T2, T3, T4 starting and ending at different instants of time. Ideally, as per general transactions in a database, one would assume the following serial order for these transactions if they were to be done in a serial fashion.
This works fine for general purpose database transactions where the input (updates to rows) are already prepared before starting the transactions. In Hudi, this is not true. Due to the nature of LAZY evaluation of data (Dataframes, Datasets etc..), when the actual update is prepared can be different from when the transaction start time. Hence, ordering transactions / updates on the start_commit_timestamp may not yield correct results.
An alternate options is to serially order them based on transactions end time (end_commit_timestamp), the serial order would look as follows
As you can see, the T2 jumps from being the second transaction to the last one.
Now it's hard to determine what should be the correct order to choose, start or end commit timestamp since we do not know when the input data is actually materialized. Assuming materialization of data is based on some kind of checkpoint and that the latest snapshot read for a particular transaction is based on the transactions start timestamp (since that can be decided at the beginning in the AbstractHoodieWriteClient), we choose to order transactions based on START_COMMIT_TIMESTAMP.
Note, implementing this requires changes in Hudi to ensure that the latest snapshot of the table is sealed during the start of the transaction and does not change during the course of the transaction all the way till the end. Without this change, our above assumptions are violated that will leave the table in an inconsistent state. More specifically, this requires sealing the ActiveTimeline and not reloading it during the entirety of the transaction.
Scheduling & Executing
Although we can solve the problems of update conflicts using CRDTs, there still exists a problem of how to co-ordinate other table services with these concurrent writes. Services such as compaction and clustering require a single writer or a table level lock to get the current state of the system before such a service is scheduled. In its current implementation, no concurrent writer can be started when scheduling these services. Let's take a look at the underlying assumptions and limitations
To solve the issue of scheduling table services, we have the following options
Option 1 : Brief, table level locks for all writers and table services - Writers take a table level lock for the duration of creating the .commit file under the hoodie folder. Note, this is NOT optimistic concurrency. The lock is just meant to "seal" the state of the table when committing data to it. Similarly, scheduling a table service takes a lock to scan and create the plan.
Option 2: Use monotonic timestamps to create ordered static views of the file system
Option 2 implementation
BASE FILE (F1_C1) + LOG_FILE (F1_L1_C1).<sequence_id> : For HDFS, we append multiple log blocks with different commit times inside the same log file. It's then hard to know which commits have gone into the LOG file just by the name. Instead, if we could change adopt the following changes
Now, consider the following sequence of operations without any LOCKS.
C1 C2 C3.inflight C4 C5.inflight
Now, a compaction is scheduled.
C1 C2 C3.inflight C4 C5.inflight COMPACTION.schedule.C6 C7.inflight
We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit.
With log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. We cannot remove or clean older versions of data until we know it is "safe" to do so. This safety depends on whether the latest, correct update version has been recorded in the log file.
Additionally, Hudi already only supports "cleaning" for MergeOnRead tables at FileSlice levels, any new updates going to log files cannot be cleaned individually, but only at the file slice level. With CRDTs, same will apply.
Incremental pull allows to pull changes happening on a dataset. Without multiple writers, there is a monotonically increasing timestamp against which incremental pull can extract the changes since the last timestamp. With multiple writers, as pointed above in optimistic concurrency implementation, this is violated. Another important thing to note is that incremental pull currently provides the latest snapshot of a record by merging all intermediate changes that happened from time t1 to t2.
For CRDTs based concurrency management, with updates applied out of order, we want to generate the incremental pull snapshot by applying the same rules of CRDT so that no updates are lost for downstream consumers and they always see consistent state of the data. NOTE, for the OverwriteWithLatestRecord implementation is not a CRDT and special handling is needed for this as pointed below.
This would mean that Spark, Presto and Hive MOR incremental pull have to implement the same semantics of ordering updates like in a RealtimeView and in compaction.
NOTE that incremental pull based on END_COMMIT_TIME is still required for COPY_ON_WRITE tables using optimistic concurrency control management.
COW vs MOR
Let's dive deeper into the merge cost of MergeOnRead tables.
COW read path
MOR read path