Skip to end of metadata
Go to start of metadata

Proposers

Approvers

Status

Current state: UNDER DISCUSSION

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

In the current implementation, HUDI Writer Client (in the write path) and HUDI queries (through Inputformat in the read path) have to perform a “list files” operation on the file system to get the current view of the file system.  In HDFS, listing all the files in the dataset is a NameNode intensive operation for large data sets. For example, one of our HUDI dataset has about thousands of date partitions with each partition having thousands of data files.

With this effort, we want to:

  1. Eliminate the requirement of “list files” operation
    1. This will be done by proactively maintaining metadata about the list of files
    2. Reading the file list from a single file should be faster than large number of NameNode operations
  2. Create Column Indexes for better query planning and faster lookups by Readers
    1. For a column in the dataset, min/max range per Parquet file can be maintained.
    2. Just by reading this index file, the query planning system should be able to get the view of potential Parquet files for a range query.  
    3. Reading Column information from an index file should be faster than reading the individual Parquet Footers.

This should provide the following benefits:

  1. Reducing the number of file listing operations improves NameNode scalability and reduces NameNode burden.
  2. Query Planner is optimized as the planning is done by reading 1 metadata file and is mostly bounded regardless of the size of the dataset 
  3. Can allow for performing partition path agnostic queries in a performant way


In this document we provide two approaches on how to implement this functionality. We intend to finalize to a single approach after discussions and some workflow testing on sample dataset. 

Background

<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>

Implementation Approach #1: Layered metadata (two levels)

With this approach, Hudi file system metadata is maintained in two levels, masterManifest and partitionManifest files, adhering to Hudi MVCC design.  


Master Manifest

Master manifest file holds  a “partition metadata” record for each partition in the dataset.  Partition metadata record holds the name of the partition and an optional aggregated metadata for each corresponding partition.   For example, if min/max range metadata is to be maintained, then each “partition metadata record in the master manifest would have <partition name, <partition level min, max> for each column in the dataset>. 

To support versioning operations (rollback, restore & savepoint etc), master manifest could maintain a map of {<commitId -> <partition metadata records for all partitions affected by the commit>} for every commit in the active timeline   For all partitions that have been created/updated by commits that have since been archived, “Master commit” list of “partition metadata” records. 

Master manifest file size

For a sample dataset with 100 columns, 2000 date partitions, each partition with ~1000 data files and assuming hudi maintaining 48 commits in the active timeline (1 day’s worth of commits, with 30 minute cadence),  each commit is expected to touch roughly 50 partitions, average record size of the dataset is 5k bytes and “partition metadata” record contains <min/max> values for each column :

  • Master manifest file will have ~2000 “partition metadata records”.
  • Each “partition metadata” record is expected to be about 2 * average record size of the dataset. (for sample dataset, each “partition metadata record is of ~10k bytes in size).
  • For supporting easier versioning operations, for every commit in the active timeline, master manifest would maintain  “partition metadata records” for all partitions affected by the commit. (48 commits * 50 affected partitions = 2400 records). 
  • Master manifest file would be of size ~45MB, if we maintain per-commit metadata (10k bytes per “partition metadata” record * 2000 + 2400 records).   About ~20MB in size, if per commit metadata is not maintained.

Partition Manifest file

Partition manifest file holds a “datafile metadata” record for every data file in the partition path. Partition manifest file can be located under the partition path (alternatively under a central location like .metadatastore).  

Each “datafile metadata” record holds the name of the data file and corresponding aggregated metadata for each data file.  For example, if min/max range metadata is to be maintained, then each “datafile metadata record in the partition manifest would have <data file name, <data file level min, max> for each column in the dataset>

Partition Manifest file size

For the sample dataset with 100 columns, 2000 date partitions, each partition with ~1000 data files and assuming hudi maintaining 48 commits in the active timeline,  each commit is expected to touch roughly 50 partitions, average record size of the dataset is 5K bytes and “datafile metadata” record maintains <min/max> values for each column :

  • Partition manifest will have ~1000 “datafile metadata records”.
  • Each “datafile metadata” record is expected to be about 2 * average record size of the dataset. (for sample dataset, each “partition metadata record is of ~10k bytes in size).
  • Partition manifest file would be of size ~10MB (10k bytes per “datafile metadata” record * 1000 records).
  • Dataset with ~2000 partitions would contain ~2000 such partition manifest files (one per partition).


Implementation details

Ingestion of records (write path)

When ingesting records into the dataset (bulk-inserts/upserts), as new data files (or new versions of data files)  are created executors working on the partition shall update the “datafile metadata” record in the corresponding partition manifest.  Operation of updating the partition manifest would require reading the contents of the file, appending a new record for created/modified data files (both parquet/.log files) and writing the updated data to the partition manifest file.  

Query handling (read path)

From a reader/query engines perspective, Inputformat’s “listStatus” requests will be served with the current view of the file system without the expensive distributed file system queries.

Maintaining additional metadata in the manifest file would help Hudi support faster queries and optimize the dataset to handle complex queries.  During the planning phase of the query from hive/presto, through supported Hudi input formats complex queries can be optimized with the use of metadata available in master manifest and partition manifest files.

For example, with “SELECT * FROM table WHERE  x1 < C1 < y1 AND x2 < C2 < y2 …. ;”, when handling complex queries, 

  • Hudi will inspect the master manifest file to prune all partitions that don’t match the query range (for example, prune all partitions that don’t satisfy (x1 < C1 < y1)).   Apply the second and subsequent parts of the query to prune the partitions list further.
  • Once list of “candidate partitions” have been identified, Hudi will inspect the corresponding partition manifest files, to prune the data files that don’t match the query range to generate “candidate data files” where the data is most likely to reside. 
  • For most frequently used queries (learned over time for each dataset), Hudi could optimize further my maintaining a list of “hot columns” and maintaining column specific metadata (like range/ index files).

Versioning

Commit specific metadata in the master manifest file is a snapshot of the “partition metadata” records, for all partitions affected by the commit.  When rolling back a commit, “partition metadata” records for affected partitions would be replaced with “partition metadata” from previous commits. Also, Rollback/Restore/Savepoint metadata have to be enhanced to handle the manifest file updates.

File management

When ingesting records into the dataset and updating the master/partition manifest metadata, “UPDATE_MANIFEST” action could use a similar mechanism of creating an “INFLIGHT” file to avoid multiple actors/executors trying to modify the file.

Also, we shall employ a strategy of appending metadata records to the manifest files throughout the day and consolidate the appended blocks periodically once every 24 hours.



Implementation Approach #2: Manifests and Column Indexes


The main idea in this approach is as follows:

  1. Maintain a key-value file which maps each HUDI datafile to a unique integer ID
    1. This file is called the Master Manifest File
    2. Think of this as a mirror of how NameNode is saving the information (Inode -> FileStatus) in its memory
  2. Every time an action takes place in the HUDI dataset, new files may be added (Commit, Compaction) and some files may be deleted (Clean, Rollback). At each such instance, a new file is created
    1. The Delta Manifest File contains changes to the file system made since the Master Manifest was created.
    2. It contains the unique Integer ID of all newly added files.
    3. It contains “tombstones” for all files deleted
    4. The combination of Delta and Master Manifest provides the “current” view of the filesystem.
    5. Periodically, the latest Delta Manifest is merged back to the Master Manifest. This prevents the Delta Manifest from growing infinitely in size.
  3. For each “hot” column (i.e. a Column which is used with a Query), a single file called the Column Index File is maintained
    1. This file contains the following information [min_val, max_val, fileID]. In other words, it contains column value range for each Parquet file in the HUDI dataset (similar to how Parquet files contain Column value range).
    2. Using fileID here reduces the size of the Column Index (less data, better compression) and also has other benefits (explained later).
    3. Column Index can be added/removed dynamically
    4. Column Indexes can be intersected if a query has multiple columns in its WHERE clause.


A special Column Index File called
“Partition Index” is also maintained. This index file maps the name of each partition to a list of fileIDs for the files in this partition. This can be used to lookup a list of all files within the partition.

Some experimental numbers

Let’s take an example table :

Size: 1PB+,  #Files: 2million,  #partitions: 2000, Datafiles per partition: 1000

Based on the above dataset, these are some sample numbers for this approach:

    1. Raw size of Master Manifest (int -> filePath): 550MB
    2. Compressed size of Master Manifest (zstd -12, <1sec): 55MB
    3. Raw size of partition index file: 55MB
    4. Compressed size of partition index file (zstd -12, <200msec): 5MB

The sizes of other Column Indexes should be similar but depend on the size of the primitive value contained in the Column.

Workflow: Queries

SELECT * FROM table WHERE  x1 < C1 < y1 AND x2 < C2 < y2 …. ;

  1. Load the Column Index for C1 and find the list of fileIDs which satisfy the range of [x1, y1]
  2. Load the Column Index for C2 and find the list of fileIDs which satisfy the range of [x2, y2]
  3. Set intersection between the fileIDs in #1 and #2
  4. Read the Delta manifest + Master Manifest and lookup the paths of all the fileIDs in #3

Workflow: Ingesting data

As part of a commit, when new records are inserted to the dataset, new parquet files are created and a new Commit is recorded in the Hudi Timeline. 

Before the Commit is finalized (Inflight - > Committed transaction), the following operations are performed:

  1. Read the Delta Manifest
  2. Assign fileIDs to the newly created files
  3. Add the Column Statistics from the newly added parquet files to the Delta Manifest
    1. This can be read from the Footer of the parquet file written by each Executor which writes the parquet file (WriteStatus are collected and stats are already dumped into the COMMIT file in json format) 
  4. Write a new version of the Delta Manifest
  5. Finalize the Commit   

Performance Notes

Each time data is ingested, the Column Indexes are rendered stale i.e. they do not contain information on the files added since the Column Index was created. Writing a new Column Index each time is expensive. Hence, we can use one of these approaches:

  1. Maintain the range (min_val, max_val) information for newly added Parquet files within the Delta Manifest.
    1. PRO: Higher accuracy,   CON: Larger size of Delta Manifest
  2. Consider all files added since the Column Index was created as “matches” to the Column range. This is very easy by recording last-assigned-fileID in a Column Index. Since we assign fileIDs sequentially, the newly added fileIDs = [last-assigned-fileID, current-maximum-fileID]



Comparison between the Approaches


Approach #1Approach #2
How many new files are added 

1 master manifest file

1+ manifest files per partition

1 master manifest file
1+ delta manifest files

How does it get a list of files in single partition?
  1. Read the Master Manifest to get partition manifest path
  2. Read the Partition manifest file to get list of all files in partition
  1. Read the Partition Index to get list of fileIDs 
  2. Read the Master+Delta Manifest to resolve fileIDs to Paths
How to list all files in dataset?

We will iterate over all the Partition Manifests to get list of all files.

The Master+Delta Manifest can be scanned to get list of all files.
Can a new Column be indexed later?

Yes. But all Partition Indexes and Master Manifest need to be re-written.

Yes. A single new file for the Column Index is created. 
Can a new Column be removed later?

Yes. But all Partition Indexes and Master Manifest need to be re-written.

Yes. The single file for the Column Index is deleted. 
How many files are re-writter / changed in a new Commit?Master Manifest and all the Partition Manifests for the partitions which are ingesting data.Delta Manifest only
Does adding a new Column Index require dataset wide lock?Yes. If an ingestion job is active then Column Index will not be accurate as it may miss new files being added concurrently.No. Column Index will be limited to the files already present. All new files added concurrently will be "assumed" to be a match for the Column Index.
Does dropping a new Column Index require dataset wide lock?Yes, as all manifest files need to change. No. The Column Index file is simply deleted.
Does provisioning (setting up for the first time) require a dataset wide lock?Yes.Yes.
Is the size of the Manifest files bounded?Master Manifest size is proportional to number of partitions
Partition Manifest size is proportional to (number of files *  number of columns being indexed)

Master Manifest size is proportional to number of data files.
Column Index files are proportional to number of files that contain that column.

Does size of metadata read during query planning remain constant as dataset grows?Partition Manifest files will grow in size proportional to number of columns indexed. Since entire partition manifest needs to be read, the size of metadata read will grow with increasing number of Columns indexed.

Each Column Index is in a separate file which grows proportional to number of datafiles having that Column. 

Master+Delta Manifest files will grow in size proportional to number of total data files. These are key-value files (e.g. TFile or HFile) so dont need to be read completely to find the values.

If a query determines a column is present in 1000 partitions, how many metadata files will be read?Master Manifest will be read to get paths of 1000 Partition Manifests.
1000 Partition Manifests will be read to get list of files.
Column Index will be read to get list of fileIDs.
Master+Delta Manifest will be looked up to get file Paths.
Minimum amount of reads to perform query planning (suppose Column is in a single file)?Master Manifest + 1 Partition ManifestComplete Column Index + fileID lookups in Master+Delta Manifest
Size of the various files (2M files, 2000 partitions dataset)Master Manifest:
Partition Manifest: 

Master Manifest:
Delta Manifest: 
Column Indexes:


Prototyping Details

We have done a limited prototyping of the idea to gather some metrics as well as to further evaluate the feasibility of the feature. The prototype code also serves as a testbed to evaluate and compare ideas, file formats, use-cases.

The prototype introduces a new class MetadataStore which implements the functionality of consolidated metadata. Notable functions include:

  1. MetadataStore.create() - Initialized the metadata from an existing HUDI dataset
  2. MetadataStore.update(HoodieCommitMetadata) - Updates the metadata after a COMMIT has been completed on the HUDI dataset
  3. MetadataStore.update(HoodieCleanMetadata) - Updates the metadata after a CLEAN has been completed on the HUDI dataset
  4. MetadataStore.getAllPartitions() - Returns a list of all partitions 
  5. MetadataStore.getAllFileInPartition(partition) - Returns a list of all files in a partition

In addition, there is a MetadataStoreFileSytemView which extends AbstractTableFileSystemView and provides file listing using metadata store (AbstractTableFileSystemView.ensurePartitionLoadedCorrectly). All the performance tests use MetadataStoreFileSytemView therefore giving an accurate comparison to using HoodieTableFileSystemView used in current code.

File Format and Information

The Master Manifest is a HFile format file which contains the following key-value pairs:
    <fileID: Integer> → <filePath: String>

Hence, we are mapping each file to an integer ID. In this format, any file's path can be found by a lookup by the file's unique ID. 

The Partition-Index is a TFile format file which contains the following key-value pairs:
    <partition: String> → <fileIDs: List<Integer>>

Hence, we are mapping each partition to a list of integer ID encoded as an byte-array of 4-byte integer. In this format, given any partition, the list of the fileIDs can be determined by a lookup. To get the file paths, we need to lookup each fileID in the master-manifest as additional key-value lookup.

The Delta Manifest is a HFile format file which contains the following key-value pairs:
    <fileID: Integer> → <filePath: String>     // These are the new files created since the master manifest was created.
    <KEY_DELETED: Integer> → <deletedFiles: List<String>>     // These are the files which have been deleted  since the master manifest was created.
    <KEY_NEW_PARTITIONS: Integer> → <newPartitions: List<String>>  // These are the new partitions that have been created since the master manifest was created.
    <KEY_NEW_PARTITIONS_partX: Integer> → <fileIDs: List<Integer>>  // These are the fileIds in the new partition that have been created since the master manifest was created.

With this format, simple key value lookups can determine:

  1. All the new files created
  2. All the files deleted
  3. All the partitions created

This simple format has benefits:

  1. Very simple to create
  2. Lookups are limited to two files only (delta + master)
  3. Can be optimized for specific use-cases by saving processed information as additional keys

Initial Creation

The prototype has been developed through HUDI-CLI for ease of testing on existing large production datasets. No data is changed within the datasets. All the metadata created is saved in a temporary folder on HDFS.

hudi-cli> connect -path /some/hudi/dataset
hudi-cli> metadata create

Steps:

  1. Note the latest commit on the timeline (lets say 20200420112233.commit)
  2. Gather list of all partitions and sort them (sorting improves compression)
  3. For each partition, gather a list of files in the partition. This can be done in parallel for very large datasets
  4. Assign sequential fileIDs to each file. Files are sorted by their path.
  5. Create the master manifest file with the same instantTime (metadata_folder/20200420112233.master_manifest) 
  6. Process newer instants on the timeline (discussed later)

With this scheme, the metadata creation does not require any locking on the dataset and normal dataset pipelines (commit / clean / compaction) can continue. Changes done in parallel to the dataset will be processed incrementally by reading their instants from the timeline (part of delta manifest creation).

Metadata update

Each operation on the HUDI dataset creates some metadata (HoodieXXXMetadata) which is saved in the instants themselves. This data contains information which can be processed to find the file system changes that have occurred as part of the operation. Example:

  1. HoodieCleanMetadata has list of all files deleted
  2. HoodieCommitMetadata has list of new files created
  3. HoodieCompactionMetadata has list of new file slices/group created (in essence, new files created)

Before each instant is completed (activeTimeline.saveAsComplete), it is passed over to Metastore.update() which applies the update to the delta manifest.

hudi-cli> metadata update

This reads the timeline, finds all the instants since the master manifest and calls Metastore.update() for each of them creating delta manifest.

Delta Manifest

Delta manifest files are named in the format  <instantTime>_<masterInstantTime>.delta_manifest  (e.g. metadata_folder/202004212211133_20200420112233.delta_manifest which names a delta manifest created at 202004212211133 on top of master manifest created above).

Each delta manifest is cumulative - it contains all changes on top of the master manifest. This increases the delta manifest creation time (slightly) as the last delta manifest needs to be read before a new one can be written. But this improves lookup time as it bounds the worst case lookup attempts to two files.

Example datasets

Dataset C:  files=1,050, partitions=719 

Dataset M: files=384,757, partitions=3,615

Dataset E: files=2,271,646, partitions=493

Performance numbers

hudi-cli> metadata perf

HFile parameters: 4KB block size, GZ compression.


Dataset CDataset MDataset E
Manifest total size: 39,579
Manifest total files: 2
Manifest paths/sizes:
     20190618173541_20190618173541.2._partition_.idx: 11,611
     20190618173541_20190618173541.2.master_manifest: 39,579
File counts: files=1,050, partitions=719
Files/partition (avg): files=1.46
MaxFileId: 1,050
Manifest total size: 8,570,289
Manifest total files: 2
Manifest paths/sizes:
     20200422230243_20200422230243.2._partition_.idx: 843,149
     20200422230243_20200422230243.2.master_manifest: 8,570,289
File counts: files=283,675, partitions=3,617
Files/partition (avg): files=78.43
MaxFileId: 283,675
Manifest total size: 91,027,607
Manifest total files: 2
Manifest paths/sizes:
     20200424174758_20200424174758.2._partition_.idx: 6,626,417
     20200424174758_20200424174758.2.master_manifest: 91,027,607
File counts: files=2,275,402, partitions=497
Files/partition (avg): files=4,578.27
MaxFileId: 2,275,402
Partition lookup performance: (existing/with-consolidated-metadata/difference)
getAllPartitions: 7,798.00msec / 0.20msec / -100.00%
getAllFilesInPartition: 6.40msec / 0.10msec / -98.44%
getAllPartitions: 135,839.00msec / 11.30msec / -99.99%
getAllFilesInPartition: 146.00msec / 0.50msec / -99.66%
getAllPartitions: 602,342.00msec / 2.90msec / -100.00%
getAllFilesInPartition: 17.20msec / 267.60msec / 1,455.81%
SyncableFileSystemView performance partition: (existing/with-consolidated-etadata/difference)
getAllBaseFiles: 1.00msec / 1.00msec / 0.00%
getAllFileGroups: 102.00msec / 1.00msec / -99.02%
getAllFileSlices: 277.00msec / 39.00msec / -85.92%
getLatestBaseFiles: 11.00msec / 3.00msec / -72.73%
getLatestFileSlices: 2.00msec / 2.00msec / 0.00%
getLatestUnCompactedFileSlices: 1.00msec / 2.00msec / 100.00%
getAllBaseFiles: 75.00msec / 193.00msec / 157.33%
getAllFileGroups: 36.00msec / 140.00msec / 288.89%
getAllFileSlices: 23.00msec / 82.00msec / 256.52%
getLatestBaseFiles: 100.00msec / 187.00msec / 87.00%
getLatestFileSlices: 65.00msec / 106.00msec / 63.08%
getLatestUnCompactedFileSlices: 26.00msec / 109.00msec / 319.23%
getAllBaseFiles: 207.00msec / 1,394.00msec / 573.43%
getAllFileGroups: 247.00msec / 2,021.00msec / 718.22%
getAllFileSlices: 287.00msec / 626.00msec / 118.12%
getLatestBaseFiles: 1,086.00msec / 2,519.00msec / 131.95%
getLatestFileSlices: 927.00msec / 3,701.00msec / 299.24%
getLatestUnCompactedFileSlices: 496.00msec / 3,214.00msec / 547.98%

Units: Bytes and milli-seconds

Observations

  1. There is significant improvement in the file listing (getAllAPartitions). The above timings do not perform distributed listing (i.e. only a single Executor is used). With multiple executors, the saving will be less (in wall clack time) but the number of HDFS listing calls saved will still be same.
  2. getAllPartitions is very fast - this is because partition index is very small file and scanning all keys in it is fast
  3. Getting list of file in a partition is slightly slow. 
    1. HDFS Namenode returns file listing from in-memory data structure. So the overhead is very low and for small number of files per partition or low load this is very fast.
    2. MetadataStore performs the following operations for getting files in a partition
      1. Find the fileIDs in the partition (key lookup in partition index) - this step is very fast
      2. For each fileID: lookup the fileID in delta-manifest or manster manifest - this step is slower 
    3. This can be improved by choosing the correct parameters for the HFile - appropriate block-size and compression algorithm.
      1. Blocksize is important - small block size means we need multiple block loads to fetch files in a partition.
  4. The metadata store should be optimized for the following operations:
    1. Ingestion side:
      1. Getting list of all partitions
      2. Getting list of files in a partition
    2. Hive side
      1. Getting list of all files (SELECT * FROM ...)
      2. Getting list of files matching range query (SELECT * FROM ... WHERE COL1 > ..., COL2 < ...)  
  5. Different indexes and files will require different settings (block-size, compression-algorithm) for fine tune optimization. Since all these files are created from in-memory information, it may be possible to predict the numbers for each dataset rather than hardcoding them.

The above stats do not show the HUGE benefit by removal of all file listings for data files. Ingestion / Compaction / querying perform at least one getAllPartitions() which for a large dataset leads to large number of calls to Namenode.

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users?
    • None. This functionality will be add-on to existing datasets. If the metadata or indexes are not available, the functionality will not be enabled. 
  • If we are changing behavior how will we phase out the older behavior?
    • This new feature is backwards-compatible. If the metadata or indexes are not available, the exisiting way of listing files through the NameNode will be used.

  • If we need special migration tools, describe them here.
    • NA

  • When will we remove the existing behavior?
    • Is not required.

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

9 Comments

  1. Prashant Wason please start a mailing list DISCUSS thread when ready

  2. One high level comment is that : could we try to design this using the same base/log file concepts that we have for data? 

    indexing is being rethought in a similar fashion.. and I think we can implement a metastore using a similar approach as well..  


    (may be this is implementation detail, that we can work out down the line) 


  3. >> could we try to design this using the same base/log file concepts that we have for data? 

    The idea is very similar in Approach #1 though the file formats chosen for the base/log files will be optimized for key-value lookups rather than columnar lookup.

    Base Manifest (called Master Manifest) will have information till the last-creation-time and delta-manifest will have the updates at each Commit time. Periodically we will merge the delta into the Base. This can use the faimilar timestamp-as-fileid approach used to
    "tag" the HUDI Actions with a valid time and ID.


    >> indexing is being rethought in a similar fashion.

    Indexing is a different beast altogether with the following differences from this RFC:

    1. The very large number of UUIDs to be indexed which will lead to a very large index size. 
    2. The need for fast searches across this index which cannot be loaded in memory
    3. The need for updates without the option of re-writing the entire index.
  4. Prashant Wason I know you folks are blocked on this.. You will have a detailed review by monday..  

  5. Here's my take on the two approaches. Between the two, I do favor the approach 2's way of tracking the file level master manifest, without having two levels of hierarchy implicitly. 


    However, I do feel there are lots more to address there. 

    1. RFC, in general reasons about files and file names. This feels very COW centric.. Ideally we should design using `FileSlice`s in mind and how we manage metadata around file slices (i.e a base file and a set of log files), not just files..  It would be great if we can rewrite sections to make that clear. 
    2. I am not sure if we need to introduce another integer mapping for files? could we try designing around fileIds that we already have.  
    3. On the column ranges to file mappings, I feel we also need to think through interesting situations due to updates/deletes.e.g, if say for column c1 : (10, 20) → f1, f2 and we deleted a bunch of records in f2, now f2 may no longer hold data for the range but its not possible to decide if f1's range should shrink or not. Specifically, we may need to track/persist ranges per fileId and then derive the aggregated/collapsed ranges from that..
    4. On approach 2, I am concerned about the amount of individual column index files that need to be maintained.. 
    5. We do need to make key implementation details like bootstrapping the stats/manifests, pushing this under the FileSystemView abstractions, with atleast one concrete query engine integration fleshed out and prototyped before we can finalize I feel.  

    In general, I would love if we designed this in a log structured manner i.e updates to manifest are logged and can be compacted. (This is merely trying to formalize the notion of delta files you have in approach 2). 

    What if we had the following as three file groups 


    file_metadata : Base HFile, with key: data_fileID, value : latest file slice for the fileID. Whenever new file slice is added (compaction, merge handle) or new log files are added we log it to a log file, again with inline HFile (as we are exploring in RFC-08).

    file_stats_metadata: Base HFile with key :  <column_name>+<fileID> , value : min, max, nulls etc... Each time this changes (either via update/insert/delete), we log that and compactor will know to merge these into final stats. 

    column_stats_metadata : This is all the column indexes that you originally had as separate files , with base HFile key : <column_name> + <range> : (fileids) ... similarly we can log changes to this.. 


    My thinking is we can leverage the existing machinery (I and others can help provide these foundations for you to build on) we have for logging and compaction and design something incremental and scalable from the start. 





  6. More discussions between Nishith Agarwal Prashant Wason Balaji Varadarajan Vinoth ChandarBalajee Nagasubramaniam

    Agree on :

    • In any approach, we will try to implement mutations, deletes, cleaning , compaction using the same code/components as the actual data..

    Open items : 

    • If we have multiple file groups for manifest metadata, don't we have to list `.hoodie/metastore` ... 
      • A: we can have another level of indirection if needed..
    • If we have uuid fileGroupID in column indexes, compression won't be good.. bitwise filtering operations not possible
      • A: Maintain a integer to FileGroup.? But then, the column indexes need to read first to look up the `file_metadata` or master manifest. 
      • What if we made a fileNumber as key.. 
  7. sivabalan narayanan can you share a pointer to your hfile benchmarks here? 

  8. Open items 

    • FileID generation in the face of concurrent writers, they may try to allocate the same fileIDs. Can we benchmark the benefits from the performance benefits from fileID? 
    • In general we need to nail down how we handle async operations like cleaning/compaction.. and let them publish to the consolidated metadata 
    • Can we implement the delta manifest using the existing log format writer? 


    cc Nishith Agarwal Balaji Varadarajan Balajee Nagasubramaniam Prashant Wason

  9. For implementing phase 1 (just eliminating listing using partition index) 

    1. HoodieTable abstraction has pluggable base and log formats 
      1. helps rfc-08, rfc-02 (orc/avro)...rfc-15 phase 2 column indexes..
    2. New HoodieFileGroupIO  between IOHandles and ParquetWriters/ParquetReader
      1. HoodieFileGroupIO.withBaseFileFormat(parquet).withLogFileFormat(avro).withWriteRecordFormat(HoodieRecord.class)... 
      2. rfc-08/rc-15 - HoodieFileGroupIO.withBaseFileFormat(HFile).withLogFileFormat(HFile).withWriteRecordFormat(HoodieRecord.class)
      3. rfc-02 -  HoodieFileGroupIO.withBaseFileFormat(Orc).withLogFileFormat(Orc).withWriteRecordFormat(HoodieRecord.class)
      4. for input formats : HoodieFileGroupIO.....withReadRecordFormat(ArrayWritable.class) ...  
      5. for spark real time view : HoodieFileGroupIO.....withReadRecordFormat(Row.class) ..
    3. Multi Table Commits
      1. Goal : atomically commit to both data table and metadata table. 
      2. Implementation : Each HoodieTable has its own timeline..  notion of parent timeline.. i.e dataset level timeline.
      3. Three timelines on each dataset
        1. data timeline : current timeline in .hoodie ... 
        2. metadata timeline : added under .hoodie/metadata.. 
        3. dataset timeline : added under .hoodie/global-timeline
      4. Single commit on a dataset 
        1. Start a commit on dataset timeline (c1) (requestd state)
        2. Start a commit on data timeline at same instant time (c1) (requested state)
        3. Start a commit on metadata timeline at same instant time (c1)
        4. Perform data write
        5. Perform metadata write.
        6. Complete data commit
        7. Compelte metadata commit
        8. Complete global commit.. 
      5. Key change on query side : Additionally check a "parent" timeline for instant state
        1. C1 may be compelte on data timeline, but if c1 is not completed on parent/global timeline..
      6. Other benefits 
        1. Multi hudi dataset, rfc-08 will just add index hoodietable.
      7. Alternative : One single timeline shared across 2 tables (data and metadata).. 
        1. We have same number of files on timelines.. (1 less list call)
        2. Archival runs independently.. 
        3. Share .commit file will have writeStatuses for both data and metadata
    4. Multi writer concurrency 
      1. Compaction/Cleaning may change metadata in parallel with ingest
      2. Logging parallel y into same file group and merging based on instant time should work..