Proposers
Approvers
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread:
JIRA: HUDI-897 - HUDI-957Getting issue details... STATUS
Released: <Hudi Version>
Abstract
The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write incoming data into smaller data files. This will increase parallelism and improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files.
In this proposal, we present clustering framework for improving write throughput without compromising on query performance. Clustering framework can be used for rewriting data in a different way. Some example usecases:
- Improve freshness: Write small files. stitch small files into large files after certain criteria are met (Time elapsed/ number of small files etc)
- Improve query performance: Changing data layout on disk by sorting data on (different) columns.
Implementation
Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to rewrite data in a different format. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.
COW Table timeline
In the example flow chart above, we show a partition state over time (t5 to t9). The sequence of steps taken for writing are listed below.
- At t5, a partition in table has 5 file groups f0, f1, f2, f3, f4. For simplicity, assume that each file group is 100MB. So the total data in the partition is 500MB.
- A clustering operation is requested at t6. Similar to compaction, we create a “t6.clustering.requested” file in metadata with ‘ClusteringPlan’ that includes all the file groups touched by clustering action across all partitions.
- Example contents:
- { partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”] }
- Lets say maximum file size after clustering is configured to be 250MB. Clustering would re-distribute all the data in partition into two file groups: c1, c2. These file groups are ‘phantom’ and invisible to queries until clustering is complete at t8.
- Also, note that records in a file group can be split into multiple file groups. In this example, some records from the f4 file group go to both new file groups c1, c2.
- While the clustering is in progress (t6 through t8), any upserts that touch these file groups are rejected.
- After writing new data files c1-t6.parquet and c2-t6.parquet, if a global index is configured, we add entries in the record level index for all the keys with the new location. The new index entries will not be visible to other writes because there is no commit associated yet.
- Finally, we create a commit metadata file ‘t6.commit’ that includes file groups modified by this commit (f0,f1,f2,f3,f4).
- Note that file groups (f0 to f4) are not deleted from disk immediately. Cleaner would clean these files before archiving t6.commit. We also update all views to ignore all file groups mentioned in all the commit metadata files. So readers will not see duplicates.
Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support(See RFC-22), we can consider making scheduling asynchronous too.
MOR Table timeline
This is very similar to the COW table. For MOR table, inserts can go into either parquet files or into log files. This approach will continue to support both modes. The output of clustering is always parquet format. Also, compaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.
Clustering steps
Overall, there are 2 parts to clustering
- Scheduling clustering: Create clustering plan
- Execute clustering: Process the plan. Create new files and replace old files.
Scheduling clustering
Following steps are followed to schedule clustering.
- Identify files that are eligible for clustering
- Filter specific partitions (based on config to prioritize latest vs older partitions)
- Any files that have size > targetFileSize are not eligible for clustering
- Any files that have pending compaction/clustering scheduled are not eligible for clustering
- Any filegroups that have log files are not eligible for clustering (We could remove this restriction at a later stage.)
- Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’. Grouping is done as part of ‘strategy’ defined in the plan. We can provide 2 strategies
- Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
- Groups files based on commit time.
- Group files that have overlapping values for custom columns
- As part of clustering, we want to sort data by column(s) in the schema (other than row_key). Among the files that are eligible for clustering, it is better to group files that have overlapping data for the custom columns.
- we have to read data to find this which is expensive with way ingestion works. We can consider storing value ranges as part of ingestion (we already do this for record_key). This requires more discussion. Probably, in the short term, we can focus on strategy 2a below (no support for sortBy custom columns).
- Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
- Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file.
- Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
- Group random files
- We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data
- Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
- Finally, the clustering plan is saved to the timeline. Structure of metadata is here: https://github.com/apache/hudi/blob/master/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.
Note that this scheduling can be plugged in with custom implementation. In the first version, following strategy is provided by default.
Running clustering
- Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
- Create inflight clustering file
- For each group
- Instantiate appropriate strategy class with strategyParams (example: sortColumns)
- Strategy class defines partitioner and we can use it to create buckets and write the data.
- Create replacecommit. Contents are in HoodieReplaceCommitMetadata
- operationType is set to ‘clustering’.
- We can extend the metadata and store additional fields to help track important information (strategy class can return this 'extra' metadata information)
- strategy used to combine files
- track replaced files
In the first version, following strategy based on 'bulk_insert' is provided as default option.
Commands to schedule and run clustering
Quick start using Inline Clustering
Setup for Async clustering Job
Clustering can be scheduled and run asynchronously using a SparkJob. The utilities spark job can be found here
- prepare the clusering config file:
2. Schedule clustering
you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240
3. use the schedule instant time "20210122190240" to run clustering
Some caveats
There is WIP to fix these limitations. But these issues are worth mentioning:
- This is alpha feature. Although, there is good unit test coverage, there may be some rough edges. Please report any issues.
- Better support for async clustering is coming soon.
- Clustering doesn't work with incremental timeline. So disable it by setting "hoodie.filesystem.view.incr.timeline.sync.enable: false"
- Incremental queries are not supported with clustering. Incremental queries consider all the data written by clustering as new rows.
- Clustering creates new type of commit "timestamp.replacecommit". There may be some places in code where we only read commits/deltacommits and miss replacecommits as part of reading valid commits in timeline. This can cause discrepancy in some cases.
- Clean policy is different for 'replacecommit'. So there may be more versions retained leading to extra storage usage.
Performance Evaluation
Dataset: https://s3.amazonaws.com/amazon-reviews-pds/readme.html
Query: select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category
Convert dataset to hoodie format
val df = spark.read.option("sep", "\t").option("header", "true").csv(amznReviewsRawDataPath) val tableName = "reviews" df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "customer_id"). option(RECORDKEY_FIELD_OPT_KEY, "review_id"). option(PARTITIONPATH_FIELD_OPT_KEY, "marketplace"). option(OPERATION_OPT_KEY, "insert"). option(TABLE_NAME, tableName). mode(Overwrite). save(amznReviewHudiPath); //creates ~500 data files in one partition
Evaluate query time (No Clustering)
query takes ~10 secondsscala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect()) Time taken: 10018 ms
Perform clustering
// create writeClient with overriding following write config: //"hoodie.clustering.plan.strategy.sort.columns" -> "product_category,review_date" //"hoodie.clustering.plan.strategy.max.bytes.per.group" -> "107374182400" //"hoodie.clustering.plan.strategy.max.num.groups" -> "1" val clusteringInstant = writeClient.scheduleClustering(Option.empty()) val metadata = writeClient.cluster(clusteringInstant.get, true) //creates ~350 data files and replaces existing ~500 data files one partition
Verify replacecommit is created
$ hadoop fs -ls $amznReviewHudiPath/.hoodie/
Found 10 items
drwxr-xr-x - satish 0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/.aux
drwxr-xr-x - satish 0 2021-01-21 00:49 $amznReviewHudiPath/.hoodie/.temp
-rw-r--r-- 3 satish 445621 2021-01-20 18:41 $amznReviewHudiPath/.hoodie/20210120183848.commit
-rw-r--r-- 3 satish 0 2021-01-20 18:39 $amznReviewHudiPath/.hoodie/20210120183848.commit.requested
-rw-r--r-- 3 satish 979 2021-01-20 18:40 $amznReviewHudiPath/.hoodie/20210120183848.inflight
-rw-r--r-- 3 satish 493983 2021-01-21 00:51 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit
-rw-r--r-- 3 satish 0 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.inflight
-rw-r--r-- 3 satish 131495 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.requested
drwxr-xr-x - satish 0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/archived
-rw-r--r-- 3 satish 228 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/hoodie.properties
Evaluate query time (with Clustering). Note that same query in step 2 that took 10 seconds now runs in 4 seconds
query takes ~4 secondsscala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect()) Time taken: 4099 ms
Summary
In summary, rewriting the data using clustering can speed up query runtime by ~60%
Rollout/Adoption Plan
- No impact on the existing users because add new function. Note that if you are already using Hudi, It is important to move your readers first to 0.7 release version before upgrading writers. This is because clustering creates a new type of commit and its important query engines recognize this new commit type.
Test Plan
- Unit tests
- Integration tests
- Test on the cluster for a larger dataset.
24 Comments
Vinoth Chandar
I like where you are going with this in spirit.. lets work through this..
We already have an async compaction framework, so that's not athe issue.. rather the problem is how/if we allow upserts on top of such a data set eventually... if so , there are some considerations..
liwei
Thanks, Vinoth Chandar
Just like https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 , we need a mechanism to solve two issues.
1. On the write side: do not compaction for faster write. (now merge on read can solve this problem)
2. compaction and read : also a mechanism to collapse older smaller files into larger ones while also keeping the query cost low.(if use merge on read, if do not compaction, the realtime read will slow)
we have a option:
1. On the write side: just write parquet, not compaction
2. compaction and read : because the small file is parquet, the realtime read can be fast, also user can use asynchronous compaction to collapse older smaller parquet files into larger parquet files
Best Regards,
Wei Li.
Nishith Agarwal
Satish Kotha Who is looking into how to support collapse style operations at uber ( https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 ) and scoping out the work. This will not only allow us to merge small parquet files asynchronously but also move data around for better compression, query performance etc
cc Vinoth Chandar liwei
liwei
Thanks, Nishith Agarwal
our issue just like ( https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 ) . Do you know the current progress of this issue?
Best Regards,
Wei Li.
Vinoth Chandar
Satish Kotha Nishith Agarwal liwei Shaofeng Li Balaji Varadarajan Bhavani Sudhaccing everyone here.
Summarizing thoughts on whether we can achieve these goals, without assuming RFC-15 is done ... The more I thought about it, I more I felt like our way of modelling transaction actions using a log of events in the timeline is the right way - databases have done this for decades (unlike other systems which have gone ahead and treated a snapshot as the source of truth).. RFC-15 will make a lot of this metadata consolidation more performant/efficient, of course.. But we should be able to get a correct implementation just using the timeline, if what I claim above is true. and I think it is..
Few notable changes I am suggesting :
Much of this borrrowed from the section above, with necessary changes and expansions..
a) Let's take a single partition where 6 commits, have written 6 file groups.
b) WriteClient requests a clustering action at instant time 006
plan is to merge these 6 files into following two new file groups.. This plan is serialized to the `006.clustering.requested` file on the active timeline (i.e the files directly under .hoodie)
queries will continue listing the partition and simply query file3-file8
c) WriteClient continues to ingest new data at instant time 007, 008,...
d) Clustering completes
There is a `006.commit` on the timeline (much like compaction today). Queries have to do extra work now, to avoid listing file3-file8 now and instead query file11, file12. To do this, every query must
e) Cleaning/Archiving input file groups
It's imperative that cleaner will actually delete file3-file8, before the 006 instant is moved to the archived timeline.. otherwise, we will have no way of filtering out these clustered input file groups and incur duplicates for queries..
Limitations/Considerations :
liwei
Thanks Vinoth Chandar ,
I am strongly agree your suggesting.I have some thinks, many like yours.
These ideas are still relatively primitive. If they are not correct, hope to correct them.
Best Regards,
Wei Li.
Shaofeng Li
Vinoth Chandar Thanks for your detail explanation, a doubt about the limitations/considerations: append only do not have any updates, it is just append or insert, so it will just create another new file instead of appending the updates into logs, please correct me if am wrong.
Vinoth Chandar
Shaofeng Li you are right.. Append only will not be affected by that limitation.. Just saying we need to think more generally while designing.. and for now, we need not handle update scenarios..
Satish Kotha
Hi Vinoth Chandar Nishith Agarwal Sorry for delay here. I am just starting to look at this and trying to get full context. I agree with general direction of solution and also agree that parts of this can be reused for RFC-18.
I have few follow up questions related to this approach.
Let me know if I'm misinterpreting your suggestions.
Nishith Agarwal
liwei Thanks for your suggestions. The pictures/figures on the RFC are very cool I think we all are moving towards similar ideas. Hudi already supports different kinds of compaction strategies (which can be labelled as minor and major depending on what strategy you choose) and run them asynchronously. We need to be able to use the same machinery to perform clustering (moving around of data files) Agree with you on the (3), if we have a way to index the log file (RFC-08), we can simply write data to logfiles and then use the compaction machinery to do compaction & collapse/clustering.
Vinoth Chandar mostly agree on what you have jotted down, Satish Kotha and I have similar thoughts on how to implement clustering. Let me share some more high level thoughts & ideas in addition to this on how we could support this for update workloads as well (I will use collapse / clustering interchangeably, please excuse this nit)
Say the current state of the system is with files F1_0, F2_0, F3_0. We schedule a collapse with commit 0' with a file id F4. As commits c1, c2, c3 are happening, we are running an asynchronous job to combine files F1,F2,F3 into F4 while at the same time they are getting updated. These updates are going to log files and the latest view of this data is (F1_0 +F4.log, F2_0 +F4.log, F3_0 +F4.log). Obviously, changes required to ensure no compaction occurs in the meantime (or was already scheduled) and how to read the records from the log file.
MergeOnRead
Files
c1
c2
c3
c4
F1_0
F4_0'.log
F1_4
F2_0
F2_4
F3_0
F3_4
F4_0'(Collapse)
Running Collapse()
F4_0'
= F4 + current_updates+sum(.log files so far)
Finally, collapse is done at C4 and the new file group is F4_0' + F4_0'.log. There are 2 changes needed before this happens 1) Need to be able to use the timeline and clustering.requested file to expose 2 new file system views → CollapsedView & UncollapsedView. These file system views (built using the timeline) can provide the isolation of the 2 file groups. 2) Need to update the index - a) bloom index → since this is a new file slice and intial ones (f1, f2, f3) indexes will already be there, it should be safe to update this index in the new file available by atomic commit. b) record index is tricky, we are still flushing this out
CopyOnWrite (same initial setting as before)
Files
c1
c2
c3
c4
Cn
F1_0
F1_1
F2_2
F3_3
F1_4
F2_0
F2_4
Fn
F3_0
F3_4
F4(Collapse)
Running Collapse()
F4_4 (F1_0+F2_0+F3_0) with bloom filter merged + F1_4 + F2_4 + F3_C4
Fn
For COW, if it is an update heavy table, it's slightly trickier. The problem is all files are getting updated and rewritten as the collapse is happening in the background. So, how do we provide the latest view, rather when do we say collapse has finished ?
Some high level ideas :
We're working on some designs, hope to have some details very soon.
Vinoth Chandar
I think its probably okay to assume MOR for a lot of these optimizations, since it supports so much more.. We can leave COW to error out if you want to update a file group being collapsed.. I would not like to introduce additional complexity..
> These updates are going to log files and the latest view of this data is (F1_0 +F4.log, F2_0 +F4.log, F3_0 +F4.log). Obviously, changes required to ensure no compaction occurs in the meantime (or was already scheduled) and how to read the records from the log file.
So, like I mentioned.. I am not sure if this will generalize well.. i.e what if F1 + F2 + F3 is being collapsed/clusters to F4 + F5? (i.e multiple output files instead of a single one).. In that case, we cannot easily determine where to route an update.. i.e when we get an update how do we know whether we send it to F4 or F5's log? One way to solve this is by having the clustering plan to have granular record level metadata on where key's are going to be placed. but this will be costly, since it needs actual introspection or even execution of clustering to be able to come up with that.. (so it can't be asynchronous/lightweight like how compaction is getting scheduled)..
I do have a solution for this.. which is to introduce a "global file group" in every partition that we log updates to when a file group is under clustering.. if the global file group is non-empty, we always merge that with with every other file group. Clustering will complete and also take care of moving these "updates" (even inserts, we may have to do something like this to allow multi writer w/ unique constraints.. that's another story) to their respective file groups post clustering... This does feel a bit like option 1 you mention for COW handling..
Even this has a lot of corner cases, but hopefully gives an idea about what I am thinking.. a more general approach to dealing with updates that supports N → M file group clustering.. But nonetheless, if we start by solving this without updates but in a generalzied way, I think that in itself would be a good first step.
Balaji Varadarajan
+1. Supporting clustering at file-groups level would help us be nimble and give us ability to scale up/down the scope of clustering.
I like the idea of logging concurrent updates to a designated file-group while clustering is happening. Maybe, we can have one designated ("special") file-group per scheduled clustering action to cleanly transition transition back to new clustered file-groups ?
The overall idea generalizes well with rest of the machinery.
Balajee Nagasubramaniam
+1 on the idea of having a special file group, for logging concurrent updates during the clustering operation. By making use of the write token, we can solve the file evolution for the readers/query-engines as well.
When collapsing (f1_w1_c1, f2_w2_c2, f3_w3_c3) into (f4_w4_c4, f5_w5_c4) we could create (f1_EOL_c4, f2_EOL_C4, f3_EOL_c4) empty marker data files. After committing the collapse operation, at C4, query engines would only see records present in (f4_w4_c4, f5_w5_c4) snapshot. EOL is a special write token, when present on the file name, the data files will not be evolved further. InputFormat/query engine integration can safely skip these files (treating them as special case, empty files). Query engines running on older version of Hudi should be able to safely ignore empty marker data files as well.
Vinoth Chandar
or alternatively, query engines can merge them as well
Shaofeng Li
Vinoth Chandar Nishith Agarwal Thanks for your feedback, some new ideas about the clustering/collapse
With the help of clustering, it would benefit the following three scenario,
1. We could re-cluster records to improve query performance as https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 commented.
2. the filegroup clustering will make Hudi support log append scenario more perfectly, since the writer only needs to insert into hudi directly without look up index and merging small files, it will improve write throughput and reduce write latency, and clustering small files asynchronous.
3. The clustering would enable concurrent writing to Hudi dataset to a certain extent. Right now, Hudi do not the correctness of concurrent writing. With the help of file-group clustering, considering multi-writers are writing to Hudi dataset. let's also say insert directly(not update) without merging small file, so it will end up with many small files(file groups) written by multi-writers, and we need clustering the file groups into one(or more than one) file group to solve small file problem, not considering the concurrent rollback/clean/archive.
liwei
Satish Kotha Hi, thanks so much to provide "Option#2: Clustering with snapshot isolation".
I have some questions and discussion points: Vinoth Chandar Nishith Agarwal Shaofeng Li
1. about the Option#2 implement:
1.1 when have clustering and compaction concepts. I think it will increase user understanding costs. Can we unify clustering and compaction into one concept such as clustering or optimize, and in the clustering concept have different mode such as minor compaction、major compaction、 build index ?
1.2 At the first step,I think RFC 19 can do not rely on RFC-08?
2. Future:
2.1 Do we have plans to support update&delete during the clustering in process. If the RFC-08 is read,we can do this?
2.2 current the transaction of hudi is not very perfect, and this implement will add new info to commitinfo.
Is it possible to improve the abstraction of commitinfo to support such as optimistic locking.Then we can support multi writes concurrency?
Now HUDI-944 - Getting issue details... STATUS begin to do things about this.
Vinoth Chandar
> Can we unify clustering and compaction into one concept such as clustering or optimize,
I actually think these are different operations. One is local to the file group while clustering is across file groups. Overloading then would be confusing imho
> 1 Do we have plans to support update&delete during the clustering in process.
like I mentioned above, we can introduce a global file group to buffer these writes. We can discuss this in the concurrency work as you point out. Initially even throwing an error and only supporting insert s would be a huge win already. I feel that can be accomplished without much complexity similar to compaction design..
this way we can make parallel progress without blocking projects on each other. Hope that makes sense
Vinoth Chandar
liwei Shaofeng Li Satish Kotha
I think our discussions are winding down to the same general direction now? In the interest of making progress, can one of you list down a checkpoint of what we agree on and what is still open?
If there is a way to get started on some simpler form here and iterate, that's preferrable? Such a checkpoint would help us identify this MVP
Shaofeng Li
Satish Kotha Vinoth Chandar liwei Nishith Agarwal
After the discussion, I think we reached some agreement.
Agree on:
Open items:
Plan:
Also I created an umbrella Jira tickets to track this work. HUDI-1042 - Getting issue details... STATUS
Vinoth Chandar
Clustering with conflicting updates needs more thought like you mentioned .. Shaofeng Li..
In the meantime, we can probably move ahead with implementing the first cut - given we have sufficient agreement there anyway
Vinoth Chandar
Satish Kotha Now that we are done with this almost, mind taking a pass at the RFC once and make it more readable for future use? i.e merge aspects of discussion back to the doc.
Ideally, completed RFCs serve as good source of actual design. Kafka project does an amazing job at this. So trying to emulate
Satish Kotha
Vinoth Chandar Makes sense. I'll reorganize this page next week cc: liwei
Satish Kotha
Vinoth Chandar I edited this page to reflect all aspects discovered during implementation. I will add performance section later this week with test dataset. But if you have any other feedback on this page, let me know.
cc: liwei
Vinoth Chandar
Satish Kotha thanks for jumping on this. this looks great for now. Gives enough implementation details