This RFC is deprecated. The updated version is here.
- Vinoth Chandar : [APPROVED/REQUESTED_INFO/REJECTED]
- Balaji Varadarajan : [APPROVED/REQUESTED_INFO/REJECTED]
Current state: "UNDER DISCUSSION
Discussion thread: here
Released: <Hudi Version>
Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file.
Currently, Hudi supports partitioned and non-partitioned datasets. A partitioned dataset is one which bucketizes groups of files (data) into buckets called partitions. A hudi dataset may be composed of N number of partitions with M number of files. This structure helps canonical hive/presto/spark queries to limit the amount of data read by using the partition as a filter. The value of the partition/bucket in most cases is derived from the incoming data itself. The requirement is that once a record is mapped to a partition/bucket, this mapping should be a) known to hudi b) should remain constant for the lifecycle of the dataset for hudi to perform upserts on them.
Consequently, in a non-partitioned dataset one can think of this problem as a record key <-> file id mapping that is required for hudi to be able to perform upserts on a record.
Current solution is either a) for the client/user to provide the correct partition value as part of the payload or b) use a GlobalBloomIndex implementation to scan all the files under a given path (say non-partitioned table). In both these cases, we are limited either by the capability of the user to provide this information or by the performance overhead of scanning all files' bloom index.
The proposal is to implement a new index format that is a mapping of (recordKey <-> partition, fileId) or ((recordKey, partitionPath) → fileId). This mapping will be stored and maintained by Hudi as another implementation of HoodieIndex and will address the 2 limitations mentioned above.
Types of datasets
HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.
Insert or event data is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to.
Upsert/Change Log data
An upsert/change log dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.
The 3 approaches uses by consumers of HUDI are as follows :
- The data layout is divided in buckets called partitions and each partition consists of N number of files. The client maintains a mapping of record key <-> file id for updatable tables and provides the partition information before passing off the records to Hudi for processing. The HoodieBloomIndex implementation scans all the BloomIndexes of all files under a partition, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids.
- The data layout is a flat structure, where 1 single directory contains all the files for the updatable table. The GlobalHoodieBloomIndex implementation scans all the BloomIndexes of all files, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids. The difference between the first and second is that if the number of files is very large under this base path, the indexing time can blow up.
- For append only datasets which do not require updating, simply use the partition from the current payload, for eg. the current timestamp.
Irrespective of the type of dataset (append or append + update) in use, index look up plays a very critical role in the both read and write performance. If we can solve record level indexing mapped to FileId and may be partition (depending on the use-case) without adding too much to latency, it will make Hudi more performant.
So this RFC aims at providing a record level indexing capability to Hoodie for faster lookups.
Note: For explanation purposes, let's consider non partitioned data set and so key is recorKey and value is (PartitionPath, FileId).
Hash based indexing
Index entries are hashed into buckets and each bucket will hold a mapping of recordKey to <PartitionPath, FileId>. Total number of buckets has to be pre-determined for the dataset and can't be updated later. But each bucket can scale to more than one file groups based on the load. More on file groups later in the section. With 1000 buckets and 1M entries per bucket, the index is capable of serving 1 Billion entries. So, more file groups per bucket is required only if the data set grows beyond 1 B unique entries.
Each Bucket will expose two apis, namely getRecordLocations(JavaRDD<RecordKeys>) and insertRecords(JavaPairRDD<RecordKey, Tuple2<PatitionPath, FileId>>).
We plan to use HFile(link1, link2) for storage purposes to leverage the random reads capability. You can find details about HFile benchmarking here. Short summary is, with a HFile containing 1M entries, look up of 100k entries take ~600 ms at p95. If we can achieve the same in a live system, it would be great for Hudi.
Index Write path(to be used in Hoodie Write path)
On the ingest path, once the HoodieRecordLocation is obtained for all new (to be inserted) entries, these records are mapped into (RecordKey, <PartitionPath, FileId>). These are hashed based on recordKeys and mapped to a Bucket. The bucket mapping of the records can never change and hence the hashing function for any given dataset. Each bucket consists of N no of Hfiles. Also, all writes to a single Hfile has to be sorted. So, every new batch will end up creating a new Hfile in the respective bucket. Hence every bucket will contain N no of HFiles. Definitely we need compaction to play a part to keep the number of HFiles at check. Also, do remember that there are no updates to values in this index since a record's location in Hoodie can never be changed. This characterization will help us in parallelisation on read path.
Parallelism: Ideal parallelism for write is the total number of partitions, since each batch of ingest can create one HFile per bucket at max.
Few things thats need attention during implementation: Data write and index write should be coupled and should be ACID compliant, i.e. either both are committed or both are rolledback. No partial data is seen by any readers midway.
May be as of today, Hoodie's record location is immutable, but we can't say that would hold true in future. So, this index should be capable of handling updates to mapping. In such cases, multiple values will be returned (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns FileId5 for record1, we would take HFile3's value and hence record1's location is FileId5). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general.
Index Read path (to be used in Hoodie read/update path)
On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.
Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit.
Index Delete(to be used in Hoodie write path)
We can do a typical tombstone, by adding an entry to the bucket with null value. So, during index lookup, we could still maintain the same parallelism to look up in all HFiles parallely within a given bucket, but choose the latest value if multiple values are returned. (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns null for record1, we would take HFile3's value and hence record1 considered to be deleted). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general.
Adding delete support might complicate the compaction a bit. Due to this, compaction might have to read entire content of all HFiles that needs to be compacted, to find the resolved values for all entries before writing to the new base HFile. Hopefully this might not add too much overhead. Also, compaction will ignore deleted entries to avoid unnecessary storage for delete entries. So, it may not possible to determine if a record was never inserted or was it deleted after insertion. Let me know if this is an issue with Hoodie.
Note: One point to remember is that, we should also support re-insertion of deleted entries. Above mentioned scheme would work for this use-case as well w/o any modifications.
As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function.
HFile scan vs seek
From the microbenchmarking, it is shown that for an HFile containing 1M entries, random seeks performs better until 300 to 400k look ups. Beyond that, full file scan (i.e. read entire HFile into in memory hash table and do a look up in the hash table) performs better. So, we could leverage that during our look up. We could store information about total number of entries in each Hfile. During look up, if no of entries to be looked up is < 30%, we could do random look up, if not, we could resort to full table scan. This needs to be flushed out with more finer details though. But for now, we can introduce two configs, namely "record.level.index.dynamically.choose.scan.vs.seek" and "record.level.index.do.random.lookups". First config if set to true, dynamically scan or seek will be chosen. If set to false, for streaming use-cases, value from 2nd config will be considered. For streaming use-cases developer has the ability to choose scan or seek while reading.
This section is still getting flushed out. Listing the possible options for now.
In general, its a good practice to over provision by 30% to avoid scaling beyond initial number of buckets. Because, there is some trade off or overhead involved in trying to scale further beyond initial number of buckets initialized. First version of implementation may not address this and expects users to over provision.
Option1: Adding file groups to existing buckets
In this approach, we will create multiple file groups within one bucket to scale further. A file group represents groups of HFiles forming a single group which could be compacted to one base HFile. So, each file group will have a base HFile.
Lets say with initial estimates, 1000 buckets were pre-allocated. And at some point, if we are reaching the tipping point i.e 1M entires per bucket, we can cap that file group and start a new file group. Remember with compaction in place, all HFiles per file group will be compacted to one HFile. So, if we don't expand to another file group, at some point we could have 2M entries or greater in one HFile which might give us bad performance. So, we ought to cap one file group to say 1M entries and start another file group. So, what this essentially means is that, this bucket of interest is equivalent to two virtual buckets. So, the hashing and number of buckets still remains the same, but our index is capable to scaling with more number of entries. Whenever new file group is created, the older one is sealed, i.e it may not take up any new writes. In other words, write will append to the latest file group. Read also does not change and can be looked up in parallel across all HFiles.
Option2: Multiple hash look ups and bucket groups
First hash can result in bucket indices 1 to 1000(lets call this a bucket group). Once we hit 80% load on this bucket group, we could come up with a new hash that would result in bucket indices 1001 to 2000. So, during index look up, all records will go through two lookups. This will return one bucket index per bucket group and at max only one among them should return a value if exists. New writes will go into bucket indices 1001 to 2000.
Con: One drawback with this approach is that, there is a chance of skewness where in very few buckets in first bucket group grows to > 1M entries while some of them 100k or less. So, we can't really wait until 80% load to create new bucket group.
To be filled: Compare and contrast both approaches.
As mentioned in the above sections, we need to have compaction from time to time to compact all Hfiles for a given bucket. So, in order to re-use compaction w/o needing to change a whole lot from what it is today, we have come up with an Inline FileSystem which is capable of inlining any file format(parquet, hFile, etc) within a given file. In this context, it is going to be HFile. On a high level, this InlineFileSystem enables to embed/inline HFile within any generic file. A url/path will be generated for each such embedded HFile and the same can be used with HFile Reader as though it is a standalone HFile to read the content. Here is a pictorial representation of how a file with embedded hFiles might look like.
In cloud data stores like S3(which does not support appends), we might have only one embedded hFile per data file as no appends are supported.
With this in context, let's take a look at the how data layout might look like.
Think of each bucket in this indexing scheme synonymous to a file group(which contains the actual data) in a hoodie partition. A typical partition in a MOR dataset might have one base file and N no of small delta files. Envision a similar structure for each bucket in this index. Each bucket is expected to have one base file and N number of smaller delta files with each having an embedded hFile. Each new batch of ingestion will either append a new hFile to an existing delta file as a new data block or create a new delta file and write the new hFile as the first data block. At regular intervals, compaction will pick up the base hFile and all the delta hFiles to create a new base file(with embedded hFile) as a compacted version.
Here is an illustration of how the index might look like within a single bucket pre and post compaction
Here is the same illustration for cloud stores like S3
This structure gives us many benefits. Since async compaction has been battle tested, with some minimal changes, we can reuse the compaction. Instead of data files, it is embedded hFiles in this case. With this layout, it is easy to reason about rollbacks and commits. And we get the same file system views similar to a hoodie partition. For instance, fetching new index entries after a given commit time, fetching delta between two commit timestamps, point in time query and so on are possible with this layout.
- Existing Users will have to perform a 1 time migration of existing index (bloom, user driven partition path etc) to this index. We will look into write a tool for this.
- We can tee the writes and reads and do a controlled roll out until we gain enough confidence. Until then, we could compare read results from both old and new index and ensure there are no surprises.
- Or we could write a tool for async index comparison. I am yet to look into internals of existing index, but if it supports incremental fetch, we could write a background script which will fetch records from both old and new indices at regular intervals only for the delta commits and compare them.
- This new index and the old index’s contracts will not change and that should be seamless to the client.
- We will need a migration tool to bootstrap the global index from the existing dataset.
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>