Proposers

Approvers

Status

Current state"INACTIVE

JIRA:   HUDI-86

Released: TBD

Abstract

Hudi allows us to store multiple versions of data in the def~table overtime while providing `snapshot isolation`. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table (def~table) at a certain point/instant in time. This could be done by allowing read mechanisms from a Hudi table (def~table) that go back and read the version of the data that was written a particular point/instant in time (see instant time). 

Background

Hudi provides 3 different def~query-type to access data written in 2 different def~table-type - def~copy-on-write (COW) and def~merge-on-read (MOR).

  1. def~read-optimized-query allows to reading data written in columnar format with columnar read performance
  2. def~snapshot-query is a combination of avro and parquet and optimizes for latency vs read performance (right now applicable only for MOR)
  3. def~incremental-query allows one to extract incremental change logs from a Hudi table. The incremental view really builds on top of the other 2 views.

Hudi supports 2 types of queries - Snapshot & Incremental.

  • def~query scan mode snapshot
    Snapshot queries access the latest state of a dataset. The default scan mode is set to LATEST.
  • def~query scan mode incremental
    Incremental queries provides incremental changes between time (t1,t2], can control this by using a scan mode called INCREMENTAL

All of these view are exposed by intercepting the planning stage through the HoodieInputFormat. Before we dive further into incremental changes, I want to bring to everyone's attention what incremental changes from Hudi really mean. For eg, if you have a log table (say log events from Kafka), incremental_pull (t1, t2] would provide you all the records that were ingested between those instants.

If you have a hudi def~table that you are ingesting database changes logs (updates), then incremental_pull (t1, t2] would provide you all the records that were either inserted or updated between those time instants. These records are a full representation of the updated record, meaning, updated_record = (unchanged_columns + changed_columns). 

Hive queries

We set certain configurations for `Hudi` to understand whether this is a Snapshot based query (def~query scan mode snapshot) or an Incremental query (def~query scan mode incremental. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For def~read-optimized-query, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For def~snapshot-query, since we are not able to figure out easily which log files have data from which commits/instants (def~commit / def~timeline), we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. (def~commit-time)

Presto queries

For both def~read-optimized-query and def~snapshot-querys, we rely on Presto to read out the entire def~table but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

Spark queries

For spark based incremental pull, for def~read-optimized-querys, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on def~snapshot-query is required but for def~snapshot-querys, we fall back on similar paths of applying query filtering as described above.

-----------------

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie def~instant-time. If another commit is inflight between point_in_time and the def~instant-time closest to it, this query will provide different results for different runs.

Note that there are ways to improve the queries by predicate pushdowns of _hoodie_commit_time fields which is not in the scope of discussions for this RFC.

We would like to build on these existing capabilities of Hudi to be able to provide point in time queries - access the state and contents of the table at a particular instant in time in the past.

Implementation

Introduce another type of snapshot def~query-scan-mode, POINT_IN_TIME. This will enable  scanning the Hudi def~table for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times.

There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/ def~instant-times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed def~commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi def~table. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the def~commit-times.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 


Hive queries

Users can pass a new def~query-scan-mode via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a def~commit-time that the user wants to scan the table (def~table) against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)


Presto queries

We don't have a way to perform incremental pull right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

select * from table where _hoodie_commit_time <= timeAsOf (commit_time)

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the (def~read-optimized-querydef~snapshot-querydef~incremental-query)s to users but instead expose InputFormats that build on those views. As part of those input formats, we support different def~query-scan-modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE (def~query-type (question)) to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

Caveats/Open Items

  • The number of versions to keep should match the number of def~commits the client wants to travel. Need a way to enforce this.
  • Proposed approach requires the client to perform some work and enforces some limitations
    • Can only time-travel based on the def~commit-times of a hudi def~table. The clients have to figure out a way to map the timestamp they want to travel against the def~commit-time that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi def~commit-times) to time travel against 

Rollout/Adoption Plan

  • This change introduced new configs to enable time travel for hudi tables but at the same time does not change the existing features.

Test Plan

  • Unit tests including ones that cover different cases of time-travel to ensure consistent results.
  • Units tests including ones that cover what happens if the versions are not present
  • Testing with hudi test suite, by running a test workflow for few days, with cleaner etc running and queries.



Proposers

Approvers

Status

Current state"Under Discussion"

JIRA:   HUDI-86

Released: TBD

Abstract

Hudi allows us to store multiple versions of data in the def~table overtime while providing `snapshot isolation`. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table (def~table) at a certain point/instant in time. This could be done by allowing read mechanisms from a Hudi table (def~table) that go back and read the version of the data that was written a particular point/instant in time (see instant time). 

Background

Hudi provides 3 different def~query-type to access data written in 2 different def~table-type - def~copy-on-write (COW) and def~merge-on-read (MOR).

  1. def~read-optimized-query allows to reading data written in columnar format with columnar read performance
  2. def~snapshot-query is a combination of avro and parquet and optimizes for latency vs read performance (right now applicable only for MOR)
  3. def~incremental-query allows one to extract incremental change logs from a Hudi table. The incremental view really builds on top of the other 2 views.

Hudi supports 2 types of queries - Snapshot & Incremental.

  • def~query scan mode snapshot
    Snapshot queries access the latest state of a dataset. The default scan mode is set to LATEST.
  • def~query scan mode incremental
    Incremental queries provides incremental changes between time (t1,t2], can control this by using a scan mode called INCREMENTAL

All of these view are exposed by intercepting the planning stage through the HoodieInputFormat. Before we dive further into incremental changes, I want to bring to everyone's attention what incremental changes from Hudi really mean. For eg, if you have a log table (say log events from Kafka), incremental_pull (t1, t2] would provide you all the records that were ingested between those instants.

If you have a hudi def~table that you are ingesting database changes logs (updates), then incremental_pull (t1, t2] would provide you all the records that were either inserted or updated between those time instants. These records are a full representation of the updated record, meaning, updated_record = (unchanged_columns + changed_columns). 

Hive queries

We set certain configurations for `Hudi` to understand whether this is a Snapshot based query (def~query scan mode snapshot) or an Incremental query (def~query scan mode incremental. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For def~read-optimized-query, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For def~snapshot-query, since we are not able to figure out easily which log files have data from which commits/instants (def~commit / def~timeline), we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. (def~commit-time)

Presto queries

For both def~read-optimized-query and def~snapshot-querys, we rely on Presto to read out the entire def~table but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

Spark queries

For spark based incremental pull, for def~read-optimized-querys, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on def~snapshot-query is required but for def~snapshot-querys, we fall back on similar paths of applying query filtering as described above.

-----------------

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie def~instant-time. If another commit is inflight between point_in_time and the def~instant-time closest to it, this query will provide different results for different runs.

Note that there are ways to improve the queries by predicate pushdowns of _hoodie_commit_time fields which is not in the scope of discussions for this RFC.

We would like to build on these existing capabilities of Hudi to be able to provide point in time queries - access the state and contents of the table at a particular instant in time in the past.

Implementation

Introduce another type of snapshot def~query-scan-mode, POINT_IN_TIME. This will enable  scanning the Hudi def~table for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times.

There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/ def~instant-times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed def~commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi def~table. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the def~commit-times.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 


Hive queries

Users can pass a new def~query-scan-mode via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a def~commit-time that the user wants to scan the table (def~table) against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)


Presto queries

We don't have a way to perform incremental pull right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

select * from table where _hoodie_commit_time <= timeAsOf (commit_time)

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the (def~read-optimized-querydef~snapshot-querydef~incremental-query)s to users but instead expose InputFormats that build on those views. As part of those input formats, we support different def~query-scan-modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE (def~query-type (question)) to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

Caveats/Open Items

  • The number of versions to keep should match the number of def~commits the client wants to travel. Need a way to enforce this.
  • Proposed approach requires the client to perform some work and enforces some limitations
    • Can only time-travel based on the def~commit-times of a hudi def~table. The clients have to figure out a way to map the timestamp they want to travel against the def~commit-time that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi def~commit-times) to time travel against 

Rollout/Adoption Plan

  • This change introduced new configs to enable time travel for hudi tables but at the same time does not change the existing features.

Test Plan

  • Unit tests including ones that cover different cases of time-travel to ensure consistent results.
  • Units tests including ones that cover what happens if the versions are not present
  • Testing with hudi test suite, by running a test workflow for few days, with cleaner etc running and queries.



  • No labels

12 Comments

  1. Nishith Agarwal  can you please start DISCUSS threads for this, so everyone's aware. 

  2. @Nishith Agarwalstill digesting but looks good so far, +1.

    Please confirm that the effect is "repeatable reads".

    Otherwise the semantics of all concepts involved, how commits work with COW and MOR, including current semantics of "time travel" will stay the same - right?

    there is already a "hacky" way to achieve time travel using the following methodology


  3. SemanticBeeng I think you've tagged the wrong person in your above comment (smile). The semantics of repeatable reads is contingent on the fact that the user understand the difference between any_time and commit_time. As you may have read in the open items, this understanding will be forced to the clients/users.

  4. Nishith Agarwal  left few comments.. I think point in time can be introduced as a query configuration on top of the snapshot/incremental view, not needing a new view type? Some places indicate otherwise?can you clarify . 


    +1 on the idea itself.

  5. Yes, there is not going to be a new VIEW type, it will mostly be a configuration on top of the snapshot view. 

    I'll read it over again and see if there are places that indicate otherwise, that's not the intent. 

    Please comments on the Open items/Caveats when you can.

  6. All: am reviewing this with a eye on use of `Hudi` as `feature store` where most of the use cases are about changes after / excluding initial ingestion.

    In doing so I feel the need to clarify concepts and am doing that in the background - the informal use of design and implementation concepts is fine for people with deep implementation knowledge but not for people looking to comprehend design of Hudi coming from outside.

    To review my changes and check they do not change the original semantics please see this diff https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=135859842&selectedPageVersions=19&selectedPageVersions=7.


  7. SemanticBeeng Thanks for taking the initiative to update this RFC to reflect links to the concepts that you think are useful to anyone trying to understand hudi. Shall we sequence work in this way : First, have a consensus on the implementation of the POINT_IN_TIME queries. Are you in agreement with the implementation principle ? If yes, could you please update the page to reflect the same, if not, let's chat more around implementation and your doubts. Second, (and in parallel), we should enhance this document with references to concepts that you are already doing. Once you are done, please let me know so I can take a look over the entire diff (between what you have edited and what was there before) so I can provide my feedback. How does this sound ?

  8. I am okay with the high level approach here. 

  9. Nishith Agarwal

    我实现了指定POINT_IN_TIME,可以查看该POINT_IN_TIME之前的最近的一次快照信息。这种方式只适用于COW表和MOR表的*_ro视图历史快照查询。对于*_rt快照查询则没有支持(我们业务场景暂时不需要)。关于我的code,官方是否可以给一个建议?

    I implemented the specified POINT_IN_TIME, you can view the most recent snapshot information before the POINT_IN_TIME. This method is only applicable to the historical snapshot query of the *_ro view of the COW table and the MOR table. There is no support for *_rt snapshot queries (our business scenario is temporarily not needed). Regarding my code, can the official give a suggestion?


    Class:HoodieParquetInputFormat.java  Method: filterFileStatusForSnapshotMode  

    Change:from 'roView.getLatestBaseFiles().collect(Collectors.toList)' to 'getHoodieBaseFileByPointInTime(...)'

    Rule:

    • take the latest snapshot by default.
    • when HOODIE_POINT_IN_TIME is specified,the latest basic file before POINT_IN_TIME is obtained in each partition.

      /**

       * point in time

       */

      public static final String HOODIE_POINT_IN_TIME = "hoodie.%s.consume.point.in.timestamp";


      private List<HoodieBaseFile> getHoodieBaseFileByPointInTime(String tableName, BaseFileOnlyView roView) {

        String pointInTime = job.get(String.format(HOODIE_POINT_IN_TIME, tableName));

        LOG.info(String.format("** POINT_IN_TIME : %s", pointInTime));


        // by default,get latest base files from per partition

        if (StringUtils.isNullOrEmpty(pointInTime) || !pointInTime.matches("^\\d{14}")) {

          LOG.info("** get latest base file");

          return roView.getLatestBaseFiles().collect(Collectors.toList());

        }


        // otherwise get pointInTime by job conf

        List<HoodieFileGroup> partitionToFileGroups = ((HoodieTableFileSystemView) roView).fetchAllStoredFileGroups().collect(Collectors.toList());

        List<HoodieBaseFile> hoodieBaseFiles = new ArrayList<>();

        for (HoodieFileGroup partitionToFileGroup : partitionToFileGroups) {

          String partitionPath = partitionToFileGroup.getPartitionPath();

          LOG.info(String.format("** POINT_IN_TIME : partitionPath = %s", partitionPath));

          List<HoodieBaseFile> partitionHoodieBaseFiles = roView.getLatestBaseFilesBeforeOrOn(partitionPath, pointInTime).collect(Collectors.toList());

          hoodieBaseFiles.addAll(partitionHoodieBaseFiles);

        }

        if (CollectionUtils.isEmpty(hoodieBaseFiles)) {

          throw new HoodieIOException("HoodieBaseFile is null");

        }

        return hoodieBaseFiles;

      }

  10. wangmeng Could you please open a PR so that I can review this appropriately ?

  11. We are working in developing the Time Travel ability in our project, here is our demands and design HUDI-1460 - Getting issue details... STATUS .   We'll be apprecite if you can have a review on it. Nishith Agarwal