Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread:

JIRAHUDI-897  HUDI-957 - Getting issue details... STATUS

Released: <Hudi Version>


Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write incoming data into smaller data files. This will increase parallelism and improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files. 

In this proposal, we present clustering framework for improving write throughput without compromising on query performance. Clustering framework can be used for rewriting data in a different way. Some example usecases:

  1. Improve freshness: Write small files. stitch small files into large files after certain criteria are met (Time elapsed/ number of small files etc)
  2. Improve query performance: Changing data layout on disk by sorting data on (different) columns. 


Implementation

Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to rewrite data in a different format. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.

COW Table timeline

In the example flow chart above, we show a partition state over time (t5 to t9).  The sequence of steps taken for writing are listed below.

  1. At t5, a partition in table has 5 file groups f0, f1, f2, f3, f4. For simplicity, assume that each file group is 100MB. So the total data in the partition is 500MB.
  2. A clustering operation is requested at t6. Similar to compaction, we create a “t6.clustering.requested” file in metadata with ‘ClusteringPlan’ that includes all the file groups touched by clustering action across all partitions. 
    1. Example contents:
    2. { partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”]  }
  3. Lets say maximum file size after clustering is configured to be 250MB. Clustering would re-distribute all the data in partition into two file groups: c1, c2. These file groups are ‘phantom’ and invisible to queries until clustering is complete at t8.
  4. Also, note that records in a file group can be split into multiple file groups. In this example, some records from the f4 file group go to both new file groups c1, c2.
  5. While the clustering is in progress (t6 through t8), any upserts that touch these file groups are rejected
  6. After writing new data files c1-t6.parquet and c2-t6.parquet, if a global index is configured, we add entries in the record level index for all the keys with the new location. The new index entries will not be visible to other writes because there is no commit associated yet.
  7. Finally, we create a commit metadata file ‘t6.commit’ that includes file groups modified by this commit (f0,f1,f2,f3,f4).
  8.  Note that file groups (f0 to f4) are not deleted from disk immediately. Cleaner would clean these files before archiving t6.commit. We also update all views to ignore all file groups mentioned in all the commit metadata files. So readers will not see duplicates.


Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support(See RFC-22), we can consider making scheduling asynchronous too. 


MOR Table timeline

This is very similar to the COW table. For MOR table, inserts can go into either parquet files or into log files. This approach will continue to support both modes. The output of clustering is always parquet format.  Also, compaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.

Clustering steps


Overall, there are 2 parts to clustering

  1. Scheduling clustering: Create clustering plan
  2. Execute clustering: Process the plan. Create new files and replace old files.

Scheduling clustering

Following steps are followed to schedule clustering. 

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’ defined in the plan. We can provide 2 strategies
    1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
    2. Groups files based on commit time. 
    3. Group files that have overlapping values for custom columns 
      1. As part of clustering, we want to sort data by column(s) in the schema (other than row_key). Among the files that are eligible for clustering, it is better to group files that have overlapping data for the custom columns.
        1. we have to read data to find this which is expensive with way ingestion works. We can consider storing value ranges as part of ingestion (we already do this for record_key). This requires more discussion. Probably, in the short term, we can focus on strategy 2a below (no support for sortBy custom columns).
        2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
      2. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
      3. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
    4. Group random files
    5. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata is here: https://github.com/apache/hudi/blob/master/hudi-common/src/main/avro/HoodieClusteringPlan.avsc


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

Note that this scheduling can be plugged in with custom implementation. In the first version, following strategy is provided by default.

Running clustering

  1. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  2. Create inflight clustering file
  3. For each group
    1.  Instantiate appropriate strategy class with strategyParams (example: sortColumns)
    2. Strategy class defines partitioner and we can use it to create buckets and write the data.
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and store additional fields to help track important information (strategy class can return this 'extra' metadata information)
      1. strategy used to combine files
      2. track replaced files


 In the first version, following strategy based on 'bulk_insert' is provided as default option.

Commands to schedule and run clustering

Quick start using Inline Clustering

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"


val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);


Setup for Async clustering Job

Clustering can be scheduled and run asynchronously using a SparkJob. The utilities spark job can be found here


  1. prepare the clusering config file: 

cat /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties

hoodie.clustering.inline.max.commits=2

     2. Schedule clustering

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--schedule \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--table-name hudi_table_with_small_filegroups3_schedule_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

      you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240

     3. use the schedule instant time "20210122190240" to run clustering

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--instant-time 20210122190240 \

--table-name hudi_table_with_small_filegroups_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

Some caveats

There is WIP to fix these limitations. But these issues are worth mentioning:

  1. This is alpha feature. Although, there is good unit test coverage, there may be some rough edges. Please report any issues.
  2. Better support for async clustering is coming soon.
  3. Clustering doesn't work with incremental timeline. So disable it by setting "hoodie.filesystem.view.incr.timeline.sync.enable: false"
  4. Incremental queries are not supported with clustering. Incremental queries consider all the data written by clustering as new rows.
  5. Clustering creates new type of commit "timestamp.replacecommit". There may be some places in code where we only read commits/deltacommits and miss replacecommits as part of reading valid commits in timeline. This can cause discrepancy in some cases.
  6. Clean policy is different for 'replacecommit'. So there may be more versions retained leading to extra storage usage.  

Performance Evaluation

Dataset: https://s3.amazonaws.com/amazon-reviews-pds/readme.html

Query: select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category

  1. Convert dataset to hoodie format

    val df = spark.read.option("sep", "\t").option("header", "true").csv(amznReviewsRawDataPath)
    
    val tableName = "reviews"
    
    df.write.format("org.apache.hudi").
     options(getQuickstartWriteConfigs). 
     option(PRECOMBINE_FIELD_OPT_KEY, "customer_id").
      option(RECORDKEY_FIELD_OPT_KEY, "review_id").
      option(PARTITIONPATH_FIELD_OPT_KEY, "marketplace").
      option(OPERATION_OPT_KEY, "insert").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(amznReviewHudiPath);
    
    //creates ~500 data files in one partition
  2. Evaluate query time (No Clustering)

    query takes ~10 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 10018 ms     
    
    

    Stage details  Query Plan

  3. Perform clustering

    // create writeClient with overriding following write config:
    //"hoodie.clustering.plan.strategy.sort.columns" -> "product_category,review_date"
    //"hoodie.clustering.plan.strategy.max.bytes.per.group" -> "107374182400"
    //"hoodie.clustering.plan.strategy.max.num.groups" -> "1"
    
    val clusteringInstant = writeClient.scheduleClustering(Option.empty())
    val metadata = writeClient.cluster(clusteringInstant.get, true)
    
    //creates ~350 data files and replaces existing ~500 data files one partition

    Verify replacecommit is created

    $ hadoop fs -ls $amznReviewHudiPath/.hoodie/

    Found 10 items

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/.aux

    drwxr-xr-x   - satish           0 2021-01-21 00:49 $amznReviewHudiPath/.hoodie/.temp

    -rw-r--r--   3 satish      445621 2021-01-20 18:41 $amznReviewHudiPath/.hoodie/20210120183848.commit

    -rw-r--r--   3 satish           0 2021-01-20 18:39 $amznReviewHudiPath/.hoodie/20210120183848.commit.requested

    -rw-r--r--   3 satish         979 2021-01-20 18:40 $amznReviewHudiPath/.hoodie/20210120183848.inflight

    -rw-r--r--   3 satish      493983 2021-01-21 00:51 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit

    -rw-r--r--   3 satish           0 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.inflight

    -rw-r--r--   3 satish      131495 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.requested

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/archived

    -rw-r--r--   3 satish         228 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/hoodie.properties

  4. Evaluate query time (with Clustering). Note that same query in step 2 that took 10 seconds now runs in 4 seconds 

    query takes ~4 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 4099 ms      

    Stage details Query Plan

Summary

In summary, rewriting the data using clustering can speed up query runtime by ~60% 

Rollout/Adoption Plan

  • No impact on the existing users because add new function. Note that if you are already using Hudi, It is important to move your readers first to 0.7 release version before upgrading writers. This is because clustering creates a new type of commit and its important query engines recognize this new commit type.

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 


  • No labels

24 Comments

  1. I like where you are going with this in spirit.. lets work through this..

    We already have an async compaction framework, so that's not athe issue.. rather the problem is how/if we allow upserts on top of such a data set eventually... if so , there are some considerations.. 

    1. Thanks, Vinoth Chandar
      Just like https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 , we need a mechanism to solve two issues.
      1. On the write side: do not compaction for faster write. (now merge on read can solve this problem)
      2. compaction and read : also a mechanism to collapse older smaller files into larger ones while also keeping the query cost low.(if use merge on read, if do not compaction, the realtime read will slow)

      we have a option:
      1. On the write side: just write parquet, not compaction
      2. compaction and read : because the small file is parquet, the realtime read can be fast, also user can use asynchronous compaction to collapse older smaller parquet files into larger parquet files

      Best Regards,
      Wei Li.

  2. Satish Kotha Who is looking into how to support collapse style operations at uber ( https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 ) and scoping out the work. This will not only allow us to merge small parquet files asynchronously but also move data around for better compression, query performance etc

    cc Vinoth Chandar liwei

    1. Thanks, Nishith Agarwal

      our issue just like   https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 ) . Do you know  the current progress of this issue? (smile)

      Best Regards,
      Wei Li.


  3. Satish Kotha  Nishith Agarwal liwei Shaofeng Li Balaji Varadarajan Bhavani Sudhaccing everyone here. 

    Summarizing thoughts on whether we can achieve these goals, without assuming RFC-15 is done ... The more I thought about it, I more I felt like our way of modelling transaction actions using a log of events in the timeline is the right way - databases have done this for decades (unlike other systems which have gone ahead and treated a snapshot as the source of truth).. RFC-15 will make a lot of this metadata consolidation more performant/efficient, of course.. But we should be able to get a correct implementation just using the timeline, if what I claim above is true. and I think it is..


    Few notable changes I am suggesting : 

    • First of all, let's give this a better action (IMHO) name : `clustering` (since it clusters filegroups together based on some criteria; we will get to these later). We will continue referring to what we do today as `compaction`.
    • Let's implement this as a "Write mode", rather than a new append API ?  I would like to keep the things simple to insert, delete, update.. like it is now.. As you will see below, what I am suggesting is a generalization of what was proposed in the RFC. If we are going to collapse file groups, then might as well do things like sorting (we already support this for bulk_insert alone) to speed up queries? Also an user can want to do this clustering, without having the need to write small files/quickly ingest as well .. 
    • We should assume that we will cluster N input file groups into M output file groups, not just 1 output file group. Say we want to target file size of 256MB, then it might turn out all your accumulated small groups are worth about 450MB, requiring two file groups instead of 1. (this introduces few limitations as we will see) 


    Much of this borrrowed from the section above, with necessary changes and expansions.. 


    a) Let's take a single partition where 6 commits, have written 6 file groups. 

    | ---- file3-000.parquet 
    | ---- file4-001.parquet 
    | ---- file5-002.parquet
    | ---- file6-003.parquet 
    | ---- file7-004.parquet 
    | ---- file8-005.parquet


    b) WriteClient requests a clustering action at instant time 006

    plan is to merge these 6 files into following two new file groups.. This plan is serialized to the `006.clustering.requested` file on the active timeline (i.e the files directly under .hoodie) 

    file9-006.parquet
    file10-006.parquet

    queries will continue listing the partition and simply query file3-file8

    c) WriteClient continues to ingest new data at instant time 007, 008,... 

    file11-007.parquet
    file12-008.parquet

    d) Clustering completes

    There is a  `006.commit` on the timeline (much like compaction today). Queries have to do extra work now, to avoid listing file3-file8 now and instead query file11, file12. To do this, every query must 

    • Determine the latest file slice for each file group as it's done today
    • Additionally, look at all the completed clustering in the timeline & filter out all the input file groups (i.e we will filter out file3-file8, by examining 006.commit metadata).
      • Note that we can re-cluster with the output of a previous clustering, as the input.. So ordering is important when doing this. 
      • Also note that compaction scheduling should also skip such 

    e) Cleaning/Archiving input file groups 

    It's imperative that cleaner will actually delete file3-file8, before the 006 instant is moved to the archived timeline.. otherwise, we will have no way of filtering out these clustered input file groups and incur duplicates for queries.. 


    Limitations/Considerations : 

    1. We cannot have any updates to the file groups being clustered.. If we want to, our design will have to accommodate either a new logging layer to stage these updates, merge later (or) track record level mapping of input to output record locations in the clustering plan... As long as we can error out, updates during clustering it may be okay for now? (we can just target append only use-cases for now) 
    2. When we have RFC-08, we also need to issue an update for the relocated records..





    1. Thanks Vinoth Chandar ,

      I am strongly agree  your suggesting.I have some thinks, many  like yours.

      1.  before doing the clustering(compaction):we need enhance the timeline . Two things we can do 1).RFC - 15 will make a lot of this metadata consolidation more performant/efficient.  2). support more complete concurrency control    HUDI-944 - Hudi support more complete concurrency control when write data NEW
      2. clustering(compaction):we can take example by HBase,construct  minor compaction and major compaction mechanism . These compaction can do asynchronous in compaction server or client.
      3. about fast ingest: hudi can enhance the merge on read, write  the realtime data to  logfile and compaction to base parquet file. use the minor and major compaction to merge  logfile to basefile and file group compaction. Also the logfile can choose such parquet or avro. But before do this,we need complete RFC-08
      These ideas are still relatively primitive. If they are not correct, hope to correct them.

      Best Regards,
      Wei Li.



    2. Vinoth Chandar Thanks for your detail explanation, a doubt about the limitations/considerations: append only do not have any updates, it is just append or insert, so it will just create another new file instead of appending the updates into logs, please correct me if am wrong.

    3. Shaofeng Li you are right.. Append only will not be affected by that limitation.. Just saying we need to think more generally while designing.. and for now, we need not handle update scenarios.. 

      1. Hi Vinoth Chandar Nishith Agarwal  Sorry for delay here. I am just starting to look at this and trying to get full context. I agree with general direction of solution and also agree that parts of this can be reused for RFC-18.

        I have few follow up questions related to this approach.

        1. Regarding metadata lookup on read side. 
          1. There could be multiple clustering commits in active timeline.  So, hudi has to read all active commit files to gather the list of file groups to filter in query path? Or do you see this list carried from earlier commits into latest commit? 
          2. The commit metadata files are also global, so  we likely need to do include partition to file groups mapping in metadata file. This metadata file can get pretty large in some cases. Because, this is in json, it may take some changes in hudi to read large metadata files without requiring too much memory.
          3. (I agree RFC-15 would help significantly here.) 
        2. One of the reasons we are considering this clustering is to improve ingestion speed. So we want to write small parquet files (250/500MB instead of 1GB) for COW tables and coalesce small files into large parquet files asynchronously when compute is cheaper. In one of your comments, you mentioned this is not advised. Can you share the reason for me to get upto speed. cc: Nishith Agarwal let me know if you have more context here.
        3. Also, with the above example, if there are updates to file groups file11, file12 after  clustering at 006 is complete, then updates to these file groups can be slower because file11, file12 are larger. These updates could cause uneven performance on ingestion side. Is there any additional tuning that we can do here? Is this not expected to happen in common case? 
        4. A kafka partition for ingestion is consumed sequentially IIUC. So if we fail updates while clustering is in progress, we could end up creating large backlog of data to ingest (if clustering takes longer). Is this acceptable for initial design?  Do you guys have thoughts on how to improve this in future? If possible, I want to discuss potential design options here even if we chose to not implement this in initial version.


        Let me know if I'm misinterpreting your suggestions.

  4. liwei Thanks for your suggestions. The pictures/figures on the RFC are very cool (smile) I think we all are moving towards similar ideas. Hudi already supports different kinds of compaction strategies (which can be labelled as minor and major depending on what strategy you choose) and run them asynchronously. We need to be able to use the same machinery to perform clustering (moving around of data files) Agree with you on the (3), if we have a way to index the log file (RFC-08), we can simply write data to logfiles and then use the compaction machinery to do compaction & collapse/clustering. 


    Vinoth Chandar mostly agree on what you have jotted down, Satish Kotha and I have similar thoughts on how to implement clustering. Let me share some more high level thoughts & ideas in addition to this on how we could support this for update workloads as well (I will use collapse / clustering interchangeably, please excuse this nit) 


    Say the current state of the system is with files F1_0, F2_0, F3_0. We schedule a collapse with commit 0' with a file id F4. As commits c1, c2, c3 are happening, we are running an asynchronous job to combine files F1,F2,F3 into F4 while at the same time they are getting updated. These updates are going to log files and the latest view of this data is (F1_0 +F4.log, F2_0 +F4.log, F3_0 +F4.log). Obviously, changes required to ensure no compaction occurs in the meantime (or was already scheduled) and how to read the records from the log file.


    MergeOnRead


    Files

    c1

    c2

    c3

    c4


    F1_0

    F4_0'.log

    F1_4


    F2_0

    F2_4


    F3_0

    F3_4


    F4_0'(Collapse)

    Running Collapse()

    F4_0'

    = F4 + current_updates+sum(.log files so far)



    Finally, collapse is done at C4 and the new file group is F4_0' + F4_0'.log. There are 2 changes needed before this happens 1) Need to be able to use the timeline and clustering.requested file to expose 2 new file system views → CollapsedView & UncollapsedView. These file system views (built using the timeline) can provide the isolation of the 2 file groups. 2) Need to update the index - a) bloom index → since this is a new file slice and intial ones (f1, f2, f3) indexes will already be there, it should be safe to update this index in the new file available by atomic commit. b) record index is tricky, we are still flushing this out


    CopyOnWrite (same initial setting as before)


    Files

    c1

    c2

    c3

    c4

    Cn

    F1_0

    F1_1


              F2_2

                         F3_3

    F1_4


    F2_0

    F2_4

    Fn

    F3_0

    F3_4


    F4(Collapse)

    Running Collapse()

    F4_4 (F1_0+F2_0+F3_0) with bloom filter merged + F1_4 + F2_4 + F3_C4

    Fn


    For COW, if it is an update heavy table, it's slightly trickier. The problem is all files are getting updated and rewritten as the collapse is happening in the background. So, how do we provide the latest view, rather when do we say collapse has finished ?

    Some high level ideas :

    1. To keep collapsing till we reach a terminal state when that file has no more updates
      1. Till then, we need to keep the collapsed file view and the uncollapse file view to decide which one to provide to the query
    2. Collapse algorithm has some intelligence and looks at the frequency of updates for the files needed to be collapsed
      1. Only choose files which don’t change any more (collapse 2-3 days if data layout if time ordered and updates might be that frequent to older data) ?
      2. We assume that for fast moving tables, we use MOR and COW is for tables receiving lesser updates (and hence larger)
    3. To find whether we have reached terminal state
      1. Store information in F4 footer
      2. Use consolidated metadata and enhance

    We're working on some designs, hope to have some details very soon.

    1. I think its probably okay to assume MOR for a lot of these optimizations, since it supports so much more.. We can leave COW to error out if you want to update a file group being collapsed.. I would not like to introduce additional complexity.. 


      These updates are going to log files and the latest view of this data is (F1_0 +F4.log, F2_0 +F4.log, F3_0 +F4.log). Obviously, changes required to ensure no compaction occurs in the meantime (or was already scheduled) and how to read the records from the log file.


      So, like I mentioned.. I am not sure if this will generalize well.. i.e what if F1 + F2 + F3 is being collapsed/clusters to F4 + F5? (i.e multiple output files instead of a single one).. In that case, we cannot easily determine where to route an update.. i.e when we get an update how do we know whether we send it to F4 or F5's log? One way to solve this is by having the clustering plan to have granular record level metadata on where key's are going to be placed. but this will be costly, since it needs actual introspection or even execution of clustering to be able to come up with that.. (so it can't be asynchronous/lightweight like how compaction is getting scheduled).. 

      I do have a solution for this.. which is to introduce a "global file group" in every partition that we log updates to when a file group is under clustering.. if the global file group is non-empty, we always merge that with with every other file group. Clustering will complete and also take care of moving these "updates" (even inserts, we may have to do something like this to allow multi writer w/ unique constraints.. that's another story) to their respective file groups post clustering... This does feel a bit like option 1 you mention for COW handling..  

      Even this has a lot of corner cases, but hopefully gives an idea about what I am thinking.. a more general approach to dealing with updates that supports N → M file group clustering.. But nonetheless, if we start by solving this without updates but in a generalzied way, I think that in itself would be a good first step.




      1. +1. Supporting clustering at file-groups level would help us be nimble and give us ability to scale up/down the scope of clustering.

        I like the idea of logging concurrent updates to a designated file-group while clustering is happening. Maybe, we can have one designated ("special") file-group per scheduled clustering action to cleanly transition transition back to new clustered file-groups ?

        The overall idea  generalizes well with rest of the machinery.

      2. +1 on the idea of having a special file group, for logging concurrent updates during the clustering operation.   By making use of the write token, we can solve the file evolution  for the readers/query-engines as well.

        When collapsing (f1_w1_c1, f2_w2_c2, f3_w3_c3) into (f4_w4_c4, f5_w5_c4) we could create (f1_EOL_c4, f2_EOL_C4, f3_EOL_c4) empty marker data files. After committing the collapse operation, at C4, query engines would only see records present in (f4_w4_c4, f5_w5_c4) snapshot. EOL is a special write token, when present on the file name, the data files will not be evolved further. InputFormat/query engine integration can safely skip these files (treating them as special case, empty files). Query engines running on older version of Hudi should be able to safely ignore empty marker data files as well.

      3. or alternatively, query engines can merge them as well 

  5. Vinoth Chandar Nishith Agarwal Thanks for your feedback, some new ideas about the clustering/collapse

    With the help of clustering, it would benefit the following three scenario,

    1. We could re-cluster records to improve query performance as https://issues.apache.org/jira/projects/HUDI/issues/HUDI-112 commented.
    2. the filegroup clustering will make Hudi support log append scenario more perfectly, since the writer only needs to insert into hudi directly without look up index and merging small files, it will improve write throughput and reduce write latency, and clustering small files asynchronous.
    3. The clustering would enable concurrent writing to Hudi dataset to a certain extent. Right now, Hudi do not  the correctness of concurrent writing. With the help of file-group clustering, considering multi-writers are writing to Hudi dataset. let's also say insert directly(not update) without merging small file, so it will end up with many small files(file groups) written by multi-writers, and we need clustering the file groups into one(or more than one) file group to solve small file problem, not considering the concurrent rollback/clean/archive.

  6. Satish Kotha  Hi, thanks so much to provide "Option#2: Clustering with snapshot isolation". 

    I have some questions and discussion points: Vinoth Chandar Nishith Agarwal Shaofeng Li

    1.  about the Option#2 implement: 

        1.1 when have clustering and compaction concepts. I think it will increase user understanding costs. Can we unify clustering and compaction into one concept such as  clustering or optimize, and in the clustering concept have different mode such as minor compaction、major compaction、 build index ? 

        1.2 At the first step,I think RFC 19 can do not rely on RFC-08?

    2. Future:

        2.1 Do we have plans  to support update&delete during the clustering in process. If the RFC-08 is read,we can do this?

        2.2 current the transaction  of hudi is not very  perfect, and this implement will add new info to commitinfo. 

    Is it possible to improve the abstraction of commitinfo to support such as optimistic locking.Then we can support multi writes concurrency?
    Now HUDI-944 - Getting issue details... STATUS begin to do things about this.



  7. Can we unify clustering and compaction into one concept such as  clustering or optimize,

    I actually think these are different operations. One is local to the file group while clustering is across file groups. Overloading then would be confusing imho


    1 Do we have plans  to support update&delete during the clustering in process.

    like I mentioned above, we can introduce a global file group to buffer these writes. We can discuss this in the concurrency work as you point out. Initially even throwing an error and only supporting insert s would be a huge win already. I feel that can be accomplished without much complexity similar to compaction design..


     this way we can make parallel progress without blocking projects on each other. Hope that makes sense(smile)


  8. liwei Shaofeng Li Satish Kotha

    I think our discussions are winding down to the same general direction now? In the interest of making progress, can one of you list down a checkpoint of what we agree on and what is still open? 

    If there is a way to get started on some simpler form here and iterate, that's preferrable? Such a checkpoint would help us identify this MVP


  9. Satish Kotha Vinoth Chandar liwei Nishith Agarwal 

    After the discussion, I think we reached some agreement.

    Agree on:

    • Support asynchronous clustering and and during the clustering, the updates for both table(CoW and MoR) are not supported in phase 1.
    • Synchronize clustering would be easily supported.
    • Support updates/ not support updates are strategies that would be plugin, so other strategies would be easily extended in future.

    Open items:

    • Clustering is on file level or record level? that is to say, all records in one file would get into more than one clustered parquet file? I think we would first implement the file level since it is much more simpler, also this should also plugin.
    • How to support updates on MoR during clustering, nishith's idea is very good, but please considering the scene that the clustering will never done(fails all the time), the updates in F4_0'.log will be lost. Here are my two cents, take this pic for example

    1. At t7, there are some updates to f0-t0.parquet, say created f0-t0-1.log.
    2. After clustering finished at t8, any updates to f0,f1,f3,f4,f5 previous would update c1-t8.parquet, if at t10, there is a update to c1-t8.parquet, and will create log file, attention that the log file version is the delta updates (say during clustering, only created f0-t0-1.log), the log version will be 2(1 = sum(delta logs) + 1), so the log file name is c1-t8-2.log, and we would rename the delta logs(f0-t0-1.log) to c1-t8-1.log, after that we would use compaction to compact filegroup(c1-t8.parquet, c8-t8-1.log, c8-t8-2.log), and this should be at file level rather than record level, that is to say all records at one file would go into one clustering parquet file rather than two or more.
    3. From query side, it is like what nishith said, if query at t8(clustering done), the view is c1-t8.parquet + c2-t8.parquet + f0-t0-1.log, and if query at t9, the view is c1-t8.parquet + c2-t8.parquet + c8-t8-1.log(renamed from ) + c8-t8-2.log.

    Plan:

    • we could first support CoW clustering and clustering framework without supporting updates first.
    • and then MoR
    • and then MoR clustering with updates supported.
    • and then CoW clustering with updates supported.

    Also I created an umbrella Jira tickets to track this work.  HUDI-1042 - Getting issue details... STATUS

  10. Clustering with conflicting updates needs more thought like you mentioned .. Shaofeng Li.. 


    In the meantime, we can probably move ahead with implementing the first cut - given we have sufficient agreement there anyway

  11. Satish Kotha Now that we are done with this almost, mind taking a pass at the RFC once and make it more readable for future use? i.e merge aspects of discussion back to the doc. 

    Ideally, completed RFCs serve as good source of actual design. Kafka project does an amazing job at this. So trying to emulate (smile) 


    1. Vinoth Chandar Makes sense. I'll reorganize this page next week cc: liwei

    2. Vinoth Chandar  I edited this page to reflect all aspects discovered during implementation. I will add performance section later this week with test dataset. But if you have any other feedback on this page, let me know.

      cc: liwei

    3. Satish Kotha thanks for jumping on this. this looks great for now. Gives enough implementation details