NOTE

This RFC is deprecated. The updated version is here.


Proposers

Approvers

Status

Current state: "UNDER DISCUSSION

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file. 

Currently, Hudi supports partitioned and non-partitioned datasets. A partitioned dataset is one which bucketizes groups of files (data) into buckets called partitions. A hudi dataset may be composed of N number of partitions with M number of files. This structure helps canonical hive/presto/spark queries to limit the amount of data read by using the partition as a filter. The value of the partition/bucket in most cases is derived from the incoming data itself. The requirement is that once a record is mapped to a partition/bucket, this mapping should be a) known to hudi b) should remain constant for the lifecycle of the dataset for hudi to perform upserts on them. 

Consequently, in a non-partitioned dataset one can think of this problem as a record key <-> file id mapping that is required for hudi to be able to perform upserts on a record.  

Current solution is either a) for the client/user to provide the correct partition value as part of the payload or b) use a GlobalBloomIndex implementation to scan all the files under a given path (say non-partitioned table). In both these cases, we are limited either by the capability of the user to provide this information or by the performance overhead of scanning all files' bloom index. 

The proposal is to implement a new index format that is a mapping of (recordKey <-> partition, fileId) or ((recordKey, partitionPath) → fileId). This mapping will be stored and maintained by Hudi as another implementation of HoodieIndex and will address the 2 limitations mentioned above.

Background

Types of datasets

HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.

Insert/Event data

Insert or event data is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to. 

Upsert/Change Log data

An upsert/change log dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.

The 3 approaches uses by consumers of HUDI are as follows : 

  1. The data layout is divided in buckets called partitions and each partition consists of N number of files. The client maintains a mapping of record key <-> file id for updatable tables and provides the partition information before passing off the records to Hudi for processing. The HoodieBloomIndex implementation scans all the BloomIndexes of all files under a partition, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids.
  2. The data layout is a flat structure, where 1 single directory contains all the files for the updatable table. The GlobalHoodieBloomIndex implementation scans all the BloomIndexes of all files, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids. The difference between the first and second is that if the number of files is very large under this base path, the indexing time can blow up.
  3. For append only datasets which do not require updating, simply use the partition from the current payload, for eg. the current timestamp.

Irrespective of the type of dataset (append or append + update) in use, index look up plays a very critical role in the both read and write performance. If we can solve record level indexing mapped to FileId and may be partition (depending on the use-case) without adding too much to latency, it will make Hudi more performant.

So this RFC aims at providing a record level indexing capability to Hoodie for faster lookups. 

Note: For explanation purposes, let's consider non partitioned data set and so key is recorKey and value is (PartitionPath, FileId). 

Implementation


                                                                        

Hash based indexing 

Index entries are hashed into buckets and each bucket will hold a mapping of recordKey to <PartitionPath, FileId>. Total number of buckets has to be pre-determined for the dataset and can't be updated later. But each bucket can scale to more than one file groups based on the load. More on file groups later in the section.  With 1000 buckets and 1M entries per bucket, the index is capable of serving 1 Billion entries. So, more file groups per bucket is required only if the data set grows beyond 1 B unique entries. 

Each Bucket will expose two apis, namely getRecordLocations(JavaRDD<RecordKeys>) and insertRecords(JavaPairRDD<RecordKey, Tuple2<PatitionPath, FileId>>). 

Storage

We plan to use HFile(link1, link2) for storage purposes to leverage the random reads capability. You can find details about HFile benchmarking here. Short summary is, with a HFile containing 1M entries, look up of 100k entries take ~600 ms at p95. If we can achieve the same in a live system, it would be great for Hudi. 

Index Write path(to be used in Hoodie Write path)

On the ingest path, once the HoodieRecordLocation is obtained for all new (to be inserted) entries, these records are mapped into (RecordKey, <PartitionPath, FileId>). These are hashed based on recordKeys and mapped to a Bucket. The bucket mapping of the records can never change and hence the hashing function for any given dataset. Each bucket consists of N no of Hfiles. Also, all writes to a single Hfile has to be sorted. So, every new batch will end up creating a new Hfile in the respective bucket. Hence every bucket will contain N no of HFiles. Definitely we need compaction to play a part to keep the number of HFiles at check. Also, do remember that there are no updates to values in this index since a record's location in Hoodie can never be changed. This characterization will help us in parallelisation on read path. 

Parallelism: Ideal parallelism for write is the total number of partitions, since each batch of ingest can create one HFile per bucket at max.   

Few things thats need attention during implementation: Data write and index write should be coupled and should be ACID compliant, i.e. either both are committed or both are rolledback. No partial data is seen by any readers midway. 

Updates

May be as of today, Hoodie's record location is immutable, but we can't say that would hold true in future. So, this index should be capable of handling updates to mapping. In such cases, multiple values will be returned (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns FileId5 for record1, we would take HFile3's value and hence record1's location is FileId5). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Index Read path (to be used in Hoodie read/update path)

On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.

Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit. 

Index Delete(to be used in Hoodie write path)

We can do a typical tombstone, by adding an entry to the bucket with null value. So, during index lookup, we could still maintain the same parallelism to look up in all HFiles parallely within a given bucket, but choose the latest value if multiple values are returned. (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns null for record1, we would take HFile3's value and hence record1 considered to be deleted). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Adding delete support might complicate the compaction a bit. Due to this, compaction might have to read entire content of all HFiles that needs to be compacted, to find the resolved values for all entries before writing to the new base HFile. Hopefully this might not add too much overhead. Also, compaction will ignore deleted entries to avoid unnecessary storage for delete entries. So, it may not possible to determine if a record was never inserted or was it deleted after insertion. Let me know if this is an issue with Hoodie. 

Note: One point to remember is that, we should also support re-insertion of deleted entries. Above mentioned scheme would work for this use-case as well w/o any modifications. 

Hashing

As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function. 

HFile scan vs seek 

From the microbenchmarking, it is shown that for an HFile containing 1M entries, random seeks performs better until 300 to 400k look ups. Beyond that, full file scan (i.e. read entire HFile into in memory hash table and do a look up in the hash table) performs better. So, we could leverage that during our look up. We could store information about total number of entries in each Hfile. During look up, if no of entries to be looked up is < 30%, we could do random look up, if not, we could resort to full table scan. This needs to be flushed out with more finer details though. But for now, we can introduce two configs, namely "record.level.index.dynamically.choose.scan.vs.seek" and "record.level.index.do.random.lookups". First config if set to true, dynamically scan or seek will be chosen. If set to false, for streaming use-cases, value from 2nd config will be considered. For streaming use-cases developer has the ability to choose scan or seek while reading. 

Scaling further

This section is still getting flushed out. Listing the possible options for now. 

In general, its a good practice to over provision by 30% to avoid scaling beyond initial number of buckets. Because, there is some trade off or overhead involved in trying to scale further beyond initial number of buckets initialized. First version of implementation may not address this and expects users to over provision. 

Option1: Adding file groups to existing buckets

In this approach, we will create multiple file groups within one bucket to scale further. A file group represents groups of HFiles forming a single group which could be compacted to one base HFile. So, each file group will have a base HFile. 

Lets say with initial estimates, 1000 buckets were pre-allocated. And at some point, if we are reaching the tipping point i.e 1M entires per bucket, we can cap that file group and start a new file group. Remember with compaction in place, all HFiles per file group will be compacted to one HFile. So, if we don't expand to another file group, at some point we could have 2M entries or greater in one HFile which might give us bad performance. So, we ought to cap one file group to say 1M entries and start another file group. So, what this essentially means is that, this bucket of interest is equivalent to two virtual buckets. So, the hashing and number of buckets still remains the same, but our index is capable to scaling with more number of entries. Whenever new file group is created, the older one is sealed, i.e it may not take up any new writes. In other words, write will append to the latest file group. Read also does not change and can be looked up in parallel across all HFiles. 

Option2: Multiple hash look ups and bucket groups

First hash can result in bucket indices 1 to 1000(lets call this a bucket group). Once we hit 80% load on this bucket group, we could come up with a new hash that would result in bucket indices 1001 to 2000. So, during index look up, all records will go through two lookups. This will return one bucket index per bucket group and at max only one among them should return a value if exists. New writes will go into bucket indices 1001 to 2000. 

Con: One drawback with this approach is that, there is a chance of skewness where in very few buckets in first bucket group grows to > 1M entries while some of them 100k or less. So, we can't really wait until 80% load to create new bucket group. 

To be filled: Compare and contrast both approaches. 

Implementation specifics

As mentioned in the above sections, we need to have compaction from time to time to compact all Hfiles for a given bucket. So, in order to re-use compaction w/o needing to change a whole lot from what it is today, we have come up with an Inline FileSystem which is capable of inlining any file format(parquet, hFile, etc) within a given file. In this context, it is going to be HFile. On a high level, this InlineFileSystem enables to embed/inline HFile within any generic file. A url/path will be generated for each such embedded HFile and the same can be used with HFile Reader as though it is a standalone HFile to read the content. Here is a pictorial representation of how a file with embedded hFiles might look like. 


                                                                                                          

In cloud data stores like S3(which does not support appends), we might have only one embedded hFile per data file as no appends are supported. 

With this in context, let's take a look at the how data layout might look like. 

Think of each bucket in this indexing scheme synonymous to a file group(which contains the actual data) in a hoodie partition. A typical partition in a MOR dataset might have one base file and N no of small delta files. Envision a similar structure for each bucket in this index. Each bucket is expected to have one base file and N number of smaller delta files with each having an embedded hFile. Each new batch of ingestion will either append a new hFile to an existing delta file as a new data block or create a new delta file and write the new hFile as the first data block. At regular intervals, compaction will pick up the base hFile and all the delta hFiles to create a new base file(with embedded hFile) as a compacted version. 

Here is an illustration of how the index might look like within a single bucket pre and post compaction 

      

Here is the same illustration for cloud stores like S3

  


This structure gives us many benefits. Since async compaction has been battle tested, with some minimal changes, we can reuse the compaction. Instead of data files, it is embedded hFiles in this case. With this layout, it is easy to reason about rollbacks and commits. And we get the same file system views similar to a hoodie partition. For instance, fetching new index entries after a given commit time, fetching delta between two commit timestamps, point in time query and so on are possible with this layout.

Rollout/Adoption Plan

  • Existing Users will have to perform a 1 time migration of existing index (bloom, user driven partition path etc) to this index. We will look into write a tool for this.
  • We can tee the writes and reads and do a controlled roll out until we gain enough confidence. Until then, we could compare read results from both old and new index and ensure there are no surprises.
  • Or we could write a tool for async index comparison. I am yet to look into internals of existing index, but if it supports incremental fetch, we could write a background script which will fetch records from both old and new indices at regular intervals only for the delta commits and compare them. 
  • This new index and the old index’s contracts will not change and that should be seamless to the client. 
  • We will need a migration tool to bootstrap the global index from the existing dataset.

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

8 Comments

  1. can we also create a JIRA and DISCUSS thread?


  2. Let me add some POC/results from the UUID timestamp option, the RFC will be more complete and will start the discuss thread then.

  3. Irrespective of which option we choose, I have some questions
    1. a. I understand that we are mainly targeting for non-partitoned data sets. But the impl is applicable for both partitioned and non-partitioned data sets is it?

       b. If someone passes partitionId along with key, does any of these approaches will have faster look up(compared to passing in just the key) or its the same? Or if someone passes partitionId, we would directly go with current flow where in all bloom indexes are looked up in the respective partition. But again, if we go with current flow, wouldn't that be slower that the new approach, since new approach will return the fileId, where as the existing current approach we have to look up in bloom in all file groups? 
    2. Whats the state of a deleted key in index? Do we remove it or maintain a marker that it is deleted?
    - Based on our offline discussion, an update after a delete for the same key should do the upsert to the same Partition and FileId. Can you confirm this?
    3. On that note, whats the expected behavior for a repeated delete? For instance, due to some issue in pipepline, if the same batch of deletes is called twice (I guess this should be common). Assuming we remove from the index, here is what I see would happen. For the first batch, all entries are deleted and returned. Where as for the 2nd batch, all keys would return as non existent since index may not have these entries. We might do a dummy commit (This feature is getting added in my recent delete PR only.) But wanted to see if we have thought this through.
    4. Not sure if this is naive question. But if the number of buckets is not going to change for a given data set and the hashing mechanism, can we store the hashed value as keys in index file rather than entire key(key, partition path, fileId) if we see some savings in bytes.

    To address this, one option I could think of is, instead of removing the entries from index totally, we could maintain a marker that the entry is deleted. But this depends on the workload. If there are very few deletes to the dataset, then we are good. But if the workload is such that, active data set is more like a window where we have large deletes from time to time, then not having these entries in index would bloat the index size by a lot.

    5. I feel 2nd approach has a pretty good advantage compared to first, since for an entire batch one look up is good enough. we can def cache entire set(from seq file) and speed it up.

    6. I have doubts on your "Estimated size of a large dataset". But before that, can you tell me whats the key and value in both approaches in index.

  4. >I understand that we are mainly targeting for non-partitoned data sets.

    Global vs Non-global is an orthogonal concept IMO. this just helps global index work/scale well



  5. sivabalan narayanan 

    Answers below

    1 (a) As Vinoth pointed out, this is targeted for non-partitioned datasets. But to your point, one could use this for partitioned datasets as well, although that isn't the intent here since for partitioned bloom index works and scales decently well and is very light weight.

    1 (b) Folks will be able to choose an index type → GLOBAL, BLOOM etc..the documentation will explain which ones are meant for partitioned vs non-partitioned etc.

    (2) & (3) For deletes, we will remove the index as well since there is no guarantee when an update could come along for that deleted row, we don't want to keep those indexes lying around. At that time, that update would be treated as a new insert (just like what would happen in a database). For repeated deletes (through the special api that you are implementing), it will have to be dummy commit.

    (4) Hashing is used to figure out which bucket the key lies in, there can be clashes in the hash leading to the same bucket, we need to concretely look at the tuple (key, partition path) to find the file id. File ID is not part of the key, it's the value.

    (5) Reads can be made fast, but to maintain the fast reads, we need to keep the bucket sorted (hence rewritten) everytime which can be expensive. We need to do some basic POC/back of the envelope calculation to prove this.

    (6) key = (key), value = (partition path, fileId) 

  6. sivabalan narayanan left some comments.. Approach LGTM high level

  7. Hi, Nishith Agarwal sivabalan narayanan Vinoth Chandar this is a great RFC
    A. We are also doing some research and design recently about global index and bucket.
    About this RFC,i have some questions and discussion points:
    1. Can this global indexing based on HFILE program replace hbaseindex at function and performance?
    2. At this program write index will not first of all to write WAL?
    3. If the concept of hash bucket is introduced, can we do more things?
       3.1 Can we plan to support range bucket,which can greatly improve query and analysis performance?
       3.2 Can we support concurrency write between different writer with writing data belong to different bucket? (this issue https://issues.apache.org/jira/browse/HUDI-944 is plan to do this)
    4. With the hash bucket mechanism at writing hand, can we do the global index format plugin,such support both HFILE and rocksdb?
    5. Also I think with the record index,Hudi's write throughput will be greatly improved.Because the overhead will only look for the index and write parquet or log. Then at writing hand hudi can run in one process more lightweight.

    B. Our progress with global index Shaofeng Li
    Here is progress we are research and design about embed global index with RocksDB:
    1. we are testing the RocksDB as the embed global index(<hudikey,partition&fileid>). We have some result ,such as sequence get 10000 records from RocksDB with 80000000 base records will cost 44ms ,and the machine with SATA disk. This performance is good.

    2. For hudi client in one progress, one RocksDB will be OK. The problem is that if write many records ,RocksDB will consume a lot of local disk space.But RocksDB support sync old sergment to HDFS.

    3. Also for scaling writing , we plan use spark to generate key bucket ,and each key bucket data will send to the corresponding spark executor.And one executor gennerate one RocksDB table.


    Overall, I think the hfile on dfs solution is more open, and the rocksdb solution depends on its sync to hdfs/s3.  Can we participate in this RFC together?


  8. Totally. This work nicely ties back with other projects we have lined up. Love to work more closely. I will let Nishith Agarwal or sivabalan narayanan answer the questions on hfile approach, since they did a bunch of benchmarking there.

     On rocksdb, if we can get rocksDB to sync active segments to hdfs and cloud storage, its a great option. I looked at few options here - not sure if Facebook itself used rocks dB this way, rocksdb-cloud exists but only syncs older segments.. problem with having some index state local is that we could lose data if the spark/flink task was killed and retired on another node (something that routinely happens with batch jobs). Arguably we could checkpoint rocks dB at hudi commits boundaries and rollback to last checkpoint on failures. But again cost may be high, we need to reprocess data again. Love to hear your thoughts on these issues