Current state: COMPLETED
Discussion thread: here
Apache Hudi supports efficient upserts to datasets by tracking record-key to fileId mapping. Additionally, Hudi guarantees snapshot isolation using a MVCC model by carefully laying out data and tracking metadata. The current implementation of Hudi Writer in Uber does not make use of its rich metadata for building and managing file-system views. The file-system view building process relies on file-system status calls which could pose overhead to underlying file-system services. The current approach is to make partition listing file-system calls for each task performing creation/merging/appending of files and during index lookup. This pattern of not leveraging Hudi metadata is also seen in Cleaner job which again poses more overhead to underlying file-system. This proposal aims at addressing these inefficiencies in a holistic manner by leveraging Hudi’s rich metadata which is already collected and stored as part of Hudi timeline.
Hudi manages the file-system layout using 2 interconnected abstractions : File-System View and Timeline View.
- FileSystem View - An abstraction which hierarchically tracks the partitions and files present in each partitions. This abstraction allows for many versions of a file to be present in each partition and identifies the file-versions valid at any snapshot by using a careful naming scheme
- Timeline View - This abstraction provides a timeline of Hudi operations and Hudi metadata that happened on a dataset which is either complete or in inflight.
We propose an embedded timeline service running in the same JVM as Write Client. This service can incrementally read new metadata added by Hudi as part of the commit and build file-system view directly instead of performing repeated file-system listings. This timeline service exposes a REST interface which serves file-system view calls from executors.
Here are the high-level requirements for building such an embedded timeline service
- Memory Efficient - Cache needs to scale even when a given ingestion/compaction job touches a large number of partitions.
- Reduce File-System Overhead - The time service must be able to achieve better reduction in file-system calls for building and managing file-system views.
- Correctness & Progress - The ingestion/compaction job must be able to proceed correctly even when the cache is not able to be build or when the executors have access issues. This will ensure that there are no additional failure points.
- Lookup Performance - In the case of ingestion and compaction, the biggest bottleneck is either the index lookup (during indexing stage) and the actual file writing (during file-write stage). In all the cases, it is expected that the file-system view lookup time must not significantly increase and should be negligible compared to the real bottlenecks.
Embedded Server within Driver:
The file-system view is built in driver. A HTTP server is then started in the driver and executor connect to the server to access file-system views. File System view cache in the driver can be configured to be stored in multiple storage types:
- Pure In Memory Cache : For small datasets, the entire file-system view can be kept in JVM on-heap and served. This would likely be the fastest serving mode
- Spillable Disk Cache : For larger datasets, the file-system view can be stored in a spillable map which uses bounded memory. Once the memory limit is reached, additional partition views of the dataset are stored in a indexed file. With a non-sorted key-value type lookup, it scales for large datasets but at the expense of lookup speed. The current implementation doesn’t take advantage of access patterns as once an key is written to disk, it will stay in disk for the entire lifetime and not ideal for prefix scanning.
- Embedded Key-Value Store : This is similar to Spillable disk in that it works for larger datasets. By using a key-value store based on LSM, the write speed will be comparable to spillable disk and with periodic compaction, the read speed will not be sacrificed. This storage option takes advantage of database technology to use richer (granular) schema to suit lookup use-cases and also ideal when this embedded service becomes a full blown standalone service.
When for some reason, the executor is not able to reach the embedded server in the driver, the remote proxy in the executor side will transparently build a local file-system view for the concerned partition. This prevents additional point of failures.
Algorithm for Incremental Timeline Syncing:
All calls to Timeline Service for fetching the view will include a latest completed instant and timeline signature (hash) as known by the client. The timeline service will then be able to dynamically detect if its view is up to date or needs syncing. In case if one or more instants gets added, the timeline service lists the metadata folder and reads new metadata incrementally
Metadata part of instant file contains partitions and files touched. Timeline Server updates the latest file-slices based with this new information
Metadata part of instant file contains partitions and files removed. These are old file slices which are removed from the view without listing the file-system again
Rollback old commits/delta-commits
Metadata part of instant file contains partitions and files removed. These files are removed and the corresponding file-slices merged. Again, this is done from the view without listing the file-system again.
The advantage of the above approach in reducing the file-system calls will be greatly felt when the embedded timeline-service instance is servicing more than one write calls (ingest/compact/clean/rollback) and in the case of running this service in standalone mode.
Algorithm for Incremental Cleaning:
For cleaning by commits which is the de-facto cleaning policy, the clean metadata tracks the oldest commit retained. By listing the current timeline, the cleaner can identify the instants to be removed after filtering out new commit instants that needs to be retained. The next step for cleaner is to look at each of these instants including those of interleaving delta-commits and rollbacks/restores to be cleaned and identify the partitions and files to be deleted. This information is already available in the metadata and no file-system listing is needed. The Cleaner can proceed to remove the files and record the operation in a new instant file.
File System View Bootstrap:
In the first version, the embedded timeline service will be starting from scratch in the driver when write client is instantiated. In the case where this service is used in ingestion, the partitions found in the input batch will be used to bootstrap the file-system view in the timeline server. The timeline service will be doing a file-system listing call for each such partitions and populate the view. The initial file-system calls overhead is similar to that of current caching mechanism.
The checkpointing approach described in the “Future Work” section will be used to greatly reduce the file-system calls during bootstrap.
Alternatives Considered for Timeline Caching :
In-Memory File System View Sent to each executor:
In this case, a file-system view is built in driver as a regular Java Map and sent to Spark executors as Spark closure. While this is a very simple model to implement, it does not scale well for large datasets as shown above. This increase in memory requirement is not sustainable for running ingestion/compaction at scale.
File System View Managed as Spark Broadcast Variable:
In this case, the file system view manager is again built in driver and distributed as read-only broadcast variables to executors for file-system lookup. Even though, this would amortize the memory overhead costs when multiple executors run on a single physical machine, there is no guarantee that the cluster manager will place the executors to tap this savings. Also, this will be a spark-only solution and cannot be extended in general.
Roll out will be controlled by configuration to enable/disable embedded timeline service and incremental syncing
- Unit Testing to cover all operations with embedded server enabled
- Performance benchmark to measure
- Testing using production scale workloads