Proposers
- Balaji Varadarajan (vbalaji)
- Udit Mehrotra (umehrot)
Approvers
- Vinoth Chandar @vinoth : APPROVED
- Nishith Agarwal @nagarwal : REQUESTED_INFO
Status
Current state: IN PROGRESS
Discussion thread: here
JIRA: here
Released: N/A
Abstract
With Apache Hudi growing in popularity, one of the fundamental challenges for users has been about efficiently migrating their historical datasets to Apache Hudi. Apache Hudi maintains per record metadata to perform core operations such as upserts and incremental pull. To take advantage of Hudi’s upsert and incremental processing support, users would need to rewrite their whole dataset to make it an Apache Hudi table. This RFC provides a mechanism to efficiently migrate their datasets without the need to rewrite the entire dataset.
Background
Here is some background information necessary to understand the design. Readers are expected to be familiar with basic Hudi concepts described here
Per Record Hudi Metadata
The above figure shows the layout of records in hudi. Each record has 5 Hudi metadata fields :
- _hoodie_commit_time : Commit time associated with the latest mutation of the record
- _hoodie_commit_seqno : To be used in incremental pull for creating multiple windows within a single ingested batch.
- _hoodie_record_key : Hudi Record Key used for updates and deletes
- _hoodie_partition_path : Partition-Path associated with the record
- _hoodie_file_name : File Name where the records are stored
Current Bootstrap Process:
Hudi provides built in support for migrating your entire dataset to Hudi one-time using HDFSParquetImporter tool available from the hudi-cli . You could also do this via a simple read and write of the dataset using the Spark datasource APIs.
Once migrated, writes can be performed using normal means discussed here. This topic is discussed in detail here, including ways to doing partial migrations. At the risk of repeating, here are the two current approaches:
Onboard Hudi for new partitions alone:
Apache Hudi partitions can coexist with other non-hudi partitions. Apache Hudi query engine integration is carefully implemented to handle queries that span across these partitions. This would let users use Hudi for managing new partitions while keeping older partitions untouched. In the example above, historical partitions from Jan 1 2010 to Nov 30 2019 are in non-hudi format while newer partitions starting from Dec 01 2019 support Apache hudi capabilities. As the historical partitions are not managed by Apache HUDI, none of the primitives provided by Apache HUDI work on the data in those partitions. For append only type of datasets (like a table built from reading mobile/time-series data from kafka), this would work perfect.
Rewriting Existing Dataset to Apache Hudi:
If users need Apache Hudi support for all portions of their datasets, they need to completely rewrite their data in Hudi. This is needed because Hudi maintains per-record metadata and index information. They can either perform this rewrite as a whole as one unit or employ mechanism to split the datasets by partitions and load them. More concrete details are present here
On Rewriting existing dataset to Hudi:
Even though this is a one-time operation, users with very large data-lake installations may find it challenging to perform these migration at full-scale.
Large historical Fact tables usually have a pattern with a large number of columns. Nested columns are also not uncommon in these cases. Rewriting a large volume of such records has high read and write cost along with massive compute power (for columnar generation).
Providing an efficient mechanism to migrating historical tables is crucial to painless adoption of Apache Hudi. This RFC proposes a mechanism to achieve that.
Proposal
The below figure represents a conceptual layout for each record. For ease of visualization, It is represented in row format even though Parquet uses columnar format. Also, in the below figure, we assume that the index being used is Bloom Index which is the de-facto index deployed
As you can note from the above diagram, an Apache HUDI physical file contains 3 things relevant to our discussion
- For each record, 5 HUDI metadata fields with column indices 0 to 4
- For each record, the original data columns that comprises the record (Original Data)
- Additional Metadata at file footer for index lookup
Raw-data tables are usually bulkier with large number of columns per record. Only (1) and (3) are the additional metadata that makes a given Apache HUDI parquet file special.
For the purpose of this discussion, let us name the combination of (1) and (3) as “Hudi skeleton”. Hudi skeleton contains additional metadata that it maintains in each physical parquet files for supporting Hudi primitives.
The conceptual idea is to decouple Hudi skeleton data from original data (2). Hudi skeleton can be stored in Hudi file while the original data is stored in an external non-Hudi file.
As long as Hudi primitives can be made to understand this new file format changes, bootstrapping an existing table would only require generating Hudi skeleton metadata only. From initial experiments on some production grade raw-data table, we find this bootstrap mechanism to be an order of magnitude faster than normal bootstrap. The new bootstrap process for a dataset containing around 3500 partitions, 250K files and around 60 billion rows with a single field as row-key took around 1 hour to bootstrap with 500 executors each containing 1 core and 4 GB memory. The old bootstrap process though required 4x the number of executors (2000) to finish bootstrapping in a day (~24 hrs).
New Bootstrap Process:
The new bootstrap process would involve the following steps. Let us imagine a parquet dataset “fact_events” needs to be bootstrapped to hudi dataset. Let us imagine that the root path of this dataset is “/user/hive/warehouse/fact_events” and there are several day based partitions within fact_events. Within each partition, there are several parquet files. Please see below for a pictorial representation.
With the above setup, let us imagine that the user is using this new bootstrap mechanism to bootstrap this table to a new hudi dataset “fact_events_hudi” located at “/user/hive/warehouse/fact_events_hudi”
- User stops all write operations on original dataset.
- User initiates this new bootstrap one time either through delta-streamer or through a standalone tool. Users provide the following information as part of the bootstrap:
- Original (non-hudi) dataset base location
- Columns to be used for generating hudi keys.
- Parallelism for running this bootstrap
- New Hudi Dataset location
- Hudi bootstrap scans partitions and files in the original root location “/user/hive/warehouse/fact_events” and performs the following operations :
- Creates similar hudi partitions in the new dataset location. In the above example, there will be day based partitions created under “/user/hive/warehouse/fact_events_hudi”
- Using Spark parallelism, generates unique file ID and uses it to generate a hudi skeleton parquet file for each original parquet file. A special commit timestamp called “BOOTSTRAP_COMMIT” is used. For the remainder of this document, let us imagine BOOTSTRAP_COMMIT having the timestamp “000000000”. For example, if an original parquet file is at the path /user/hive/warehouse/fact_events/year=2015/month=12/day=31/file1.parquet. Let the unique file id getting generated is h1, then the skeleton file corresponding to the above parquet file will be stored at path /user/hive/warehouse/fact_events_hudi/year=2015/month=12/day=31/h1_1-0-1_000000000.parquet.
- Generates a special bootstrap index which maps each newly generated hudi skeleton file id to its corresponding original parquet file.
- Atomically commits the bootstrap operation using same hudi timeline state transitions. Standard rollback will also be supported to rollback inflight and committed bootstrap.
- If hive syncing is enabled, creates a brand new hudi hive table pointing to the new location - “/user/hive/warehouse/fact_events_hudi”
- Subsequent write operations happen on the hudi dataset.
Bootstrap Index:
The purpose of this index is to map the Hudi skeleton file with its associated external parquet-file containing original data columns. This information is part of Hudi’s file-system view and is used when generating file slices. In this respect, there is a similarity between compaction plan and bootstrap index. But unlike compaction plan, bootstrap index could be larger in size for large historical tables. Hence, a proper storage format needs to be selected to be read and write efficient.
Hudi’s file-system view is an abstraction that translates physical file-names to file-groups and file-slices. The granularity for the apis supported from this layer is at partition-level. Hence, a bootstrap index must provide faster lookups to read a single partition’s index information.
The suitable storage format is a variation of Hadoop Map File containing 2 types of files:
- Bootstrap Log : This is a bunch of sequence files with each entry containing bootstrap index information for all files within a single partition. This is a log as any changes to bootstrap index for a given partition can be done by merely adding a new entry this log files.
- Index to Bootstrap Log: This is a sequence file containing hudi-partition and the file-name and offset where bootstrap index log entry for the corresponding hudi partition is present.
With this layout, the bootstrap process using spark parallelism can control the number of such bootstrap log files containing bootstrap index information and consequently the speed of bootstrap index generation. Using the index to bootstrap log, both Hudi readers and writer can simply load the bootstrap index of the partitions they care about efficiently. The index to bootstrap log has one entry per partition and can be read into memory or into rocksDB.
Hudi cleaner operations typically removes old file slices that are no longer required. As hudi skeleton file generated due to bootstrap is part of a file-slice, it is also subjected to cleaning. Whenever such file-slices are cleaned, the bootstrap index also needs to be updated to reflect the state. Even though such cleanup is not necessary for correctness, it will keep the state consistent and gradually reduce the footprint of the bootstrap index. To support such mutations with ACID guarantees, similar MVCC mechanism like that of timeline management will be supported to keep the bootstrap index up to date while keeping concurrent readers isolated from the updates.
Supporting Upserts and Read Use-Cases:
This section describes how rest of Hudi abstractions support this new file storage and how they work together to support Hudi primitives on the bootstrapped partitions.
From the concepts page, A “file slice” refers to one complete snapshot of a logical hudi file. It contains one base file and one or more delta files. Conceptually, we encapsulate the bootstrap index information at file-slice (data-file) level. So a file-slice would be able to provide information about the external location where the original columns are residing.
In Hudi, we have an implementation abstraction call file system view which maps physical files to file-slices. This abstraction will also annotate file-slices with bootstrap index entries (skeleton-file to external-file) so that higher layers can handle external files in a consistent way.
With this model, if we need to update a batch of records “1-K” in old partitions for the newly bootstrapped table “fact_events_hudi”, the following steps are performed,
- Let us say the commit time associated with this upsert operation is “C1”. It is given that C1 is greater than the “BOOTSTRAP_COMMIT” (000000000).
- Assuming Bloom Index, index lookup happens directly on Hudi skeleton files. Let’s say the hudi skeleton file with file id “h1” has all the records,
- In the coming description, “regular” hudi file means it is a hudi parquet file with per-record hudi metadata columns, original columns and bloom index in the single file. For Copy-On-Write table, the writing phase identifies that the latest file-slice for the file Id “h1” is generated by bootstrap using special bootstrap commit time. It reads the original external file stored under original root location “/user/hive/warehouse/fact_events”. Hudi Merge Handle reads both this external file and the metadata-only hudi file parallelly, stitching the records together and merging them with incoming batch of records to create a “regular” hudi file with brand new version for the fileId “h1”.
- For Merge-On-Read table, ingestion would simply append to a delta log file and a subsequent compaction performs similar steps as Copy-On-Write table to generate a “regular” hudi file with brand new version for the fileId “h1”.
Hudi implements custom input formats to integrate with query engines. These existing custom input formats will recognize special bootstrap commit and performs column stitching between hudi record-level metadata fields in the skeleton hudi file and other columns present in external parquet file to provide same views as existing hudi tables. Note that only projected columns required by the query will be read from the physical parquet files. Please see below for a pictorial representation of how query engine integration is done
Requirements
- As with any Hudi datasets, the uniqueness constraint of record keys is expected for the dataset to be bootstrapped. Hence, care must be taken to select the columns in the original dataset to guarantee uniqueness. Otherwise, proper upsert for records corresponding to duplicate keys is not guaranteed.
Data Source Support
This section proposes a design for integrating Hudi Bootstrapped table with Spark DataSource, so that Copy-on-Write tables can be read using the Hudi data source using the following ways:
val df = spark.read.format("hudi").load("s3://<bucket>/table1/") val df = spark.read.format("hudi").load("s3://<bucket>/table1/partition1/")
Note: We can also accept a path pattern here instead, to maintain compatibility with the current behavior but for that we will have to implement our own handling of the patterns.
Proposal for COW snapshot queries
The idea here is to implement a new Spark Relation and Spark RDD that will be used for scanning and reading bootstrapped tables. The custom relation will implement PruneFilteredScan to allow for supporting filters pushdown and column pruning. For the RDD, each partition will be data file + optional skeleton file combination which will be sent to one task to perform the merge and return the results.
Following code skeleton is to provide a high-level outline of what we want to achieve here. API signatures may change as we set out to implement.
1. package org.apache.hudi.skeleton 2. 3. import org.apache.spark.rdd.RDD 4. import org.apache.spark.sql.{Row, SQLContext} 5. import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} 6. import org.apache.spark.sql.types.StructType 7. 8. case class HudiBootstrapTableState(files: List[HudiBootstrapSplit]) 9. 10. case class HudiBootstrapSplit(dataFile: String, 11. skeletonFile: String) 12. 13. class HudiBootstrapRelation(val sqlContext: SQLContext, 14. val basePath: String, 15. val optParams: Map[String, String], 16. val userSchema: StructType) 17. extends BaseRelation with PrunedFilteredScan { 18. 19. override def schema: StructType = ??? 20. 21. override def buildScan(requiredColumns: Array[String], 22. filters: Array[Filter]): RDD[Row] = { 23. // Perform the following steps here: 24. // 1. Perform file system listing to form HudiBootstrapTableState which would 25. // maintain a mapping of Hudi skeleton files to External data files 26. // 27. // 2. Form the HudiBootstrapRDD and return it 28. 29. val tableState = HudiBootstrapTableState(List()) 30. new HudiBootstrapRDD(tableState, sqlContext.sparkSession).map(_.asInstanceOf[Row]) 31. } 32. } 1. package org.apache.hudi.skeleton 2. 3. import org.apache.spark.{Partition, TaskContext} 4. import org.apache.spark.rdd.RDD 5. import org.apache.spark.sql.SparkSession 6. import org.apache.spark.sql.catalyst.InternalRow 7. 8. class HudiBootstrapRDD(table: HudiBootstrapTableState, 9. spark: SparkSession) 10. extends RDD[InternalRow](spark.sparkContext, Nil) { 11. 12. override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { 13. // This is the code that gets executed at each spark task. We will perform 14. // the following tasks here: 15. // - From the HudiBootstrapPartition, obtain the data and skeleton file paths 16. // - If the skeleton file exists (bootstrapped partition), perform the merge 17. // and return a merged iterator 18. // - If the skeleton file does not exist (non-bootstrapped partition), read 19. // only the data file and return an iterator 20. // - For reading parquet files, build reader using ParquetFileFormat which 21. // returns an Iterator[InternalRow]. 22. // - Merge the readers for skeleton and data files and return a single 23. // Iterator[InternalRow] 24. // - Investigate and implement passing of filters and required schema down 25. // for pruning and filtering optimizations that ParquetFileFormat provides. 26. } 27. 28. override protected def getPartitions: Array[Partition] = { 29. // Form the partitions i.e. HudiBootstrapPartition from HudiBootstrapTableState. 30. // Each spark task would handle one partition. Here we can do one of the 31. // following mappings: 32. // - Map one HudiBootstrapSplit to one partition, so that each task would 33. // perform merging of just one split i.e. data file and skeleton 34. // - Map multiple HudiBootstrapSplit to one partition, so that each task 35. // would perform merging of multiple splits i.e. multiple data/skeleton files 36. 37. table.files.zipWithIndex.map(file => 38. HudiBootstrapPartition(file._1, file._2)).toArray 39. } 40. } 41. 42. case class HudiBootstrapPartition(split: HudiBootstrapSplit, 43. index: Int) extends Partition
Advantages:
- We do not have to make any changes in Spark code base.
- Provides a way to control file listing logic to list down the skeleton files, and then map them to the corresponding external data file.
- Provides control over what goes into each partition and computation logic for each partition, which is what we want to achieve here.
- Same design can be further applied to Merge-on-Read tables.
Disadvantages:
- File splitting would not be supported, which may have impact on the read performance. Each task would handle merging of one skeleton + data file only. But at the same time, we do not currently have a way to handle splits of skeleton + data parquet files, to be able to split them at exactly the same row offsets and then merge them later. Even with the InputFormat column stitching logic we would have to disable splitting of files, and each split will be mapped to one file. Thus, in that sense we would following a similar approach.
Proposal for COW incremental queries
For Incremental queries we would have to employ similar logic to re-design the IncrementalRelation currently implemented in Hudi code. We can probably use the same RDD implementation there, which we implement for the snapshot query case.
Rollout/Adoption Plan
- This will be rolled out as an experimental feature in 0.5.1
- Hudi Writers and Readers do not need special configuration to identify tables using this new bootstrap mechanism. The presence of special bootstrap commit and bootstrap index will automatically trigger correct handling of these tables.
Test Plan
This change will affect all use-cases of Hudi and hence would need to be tested comprehensively at unit-testing, integration testing and in long-running testing modes
TODOs
- Spark DataSource Integration for reading such bootstrapped tables is yet to be designed.
6 Comments
Balaji Varadarajan
Will review the comments and address them once I am back from vacation.
Nishith Agarwal
Left a few comments for clarification, rest looks super awesome, great documentation!
Udit Mehrotra
Nice proposal Balaji ! Left a couple of comments. In addition, we might want to check the impact it is going to have on read performance, because of the lookup/merging its going to have.
Balaji Varadarajan
Vinoth Chandar Nishith Agarwal : Udit Mehrotra has a proposal for supporting Data Source for Bootstrapped Tables which I think can be extended for RT queries too (if using parquet to store delta files). Please review the Spark DataSource section.
Vinoth Chandar
Udit Mehrotra Left few comments.. Mostly on the same page.. I wonder if we should first try to fix the spark datasource reading path in general (complete the matrix here http://hudi.apache.org/docs/querying_data.html#merge-on-read-tables for MOR), and then later adapt for bootstrapping?
Historically, we have dug ourselves into few holes thinking COW specific
Udit Mehrotra
I think it all depends upon our priority. If we want to release Bootstrapping feature and have user's try it out in production, we would at the minimum need to have data source support for COW bootstrapped tables, like we have currently for COW Hudi tables. MOR is either ways not supported right now through data source. In this case, I don't think that it is a COW specific choice. If we are able to get this working for Bootstrapped tables (merging two parquet files and returning results), I can easily see this being extended for MOR tables. And also this design being used for non-bootstrapped MOR tables.
However, happy to work on MOR and Spark SQL stuff first if we reach a conclusion that it has higher priority.