Time travel is a SQL syntax for querying historical versions of data that allows users to specify a point in time and retrieve the data and schema of a table as it appeared at that time. With time travel, users can easily analyze and compare historical versions of data.
We propose add the following syntax for time travel statement. In the SQL 2011 standard, syntax related to Time Travel is defined.
The following syntax supports both batch mode and bounded stream mode.
Query the data for a past moment.
Query the data for the current moment
Supported Syntax and Types of Timestamp Expression
1. Supported Timestamp type
As shown in the above syntax, when querying with time travel, we support specifying a constant expression of time. Therefore, the type corresponding to this expression may be TIMESTAMP or TIMESTAMP_LTZ.
But the interface reserved for the Connector layer is a long value timestamp(The newly added method for the Catalog is as follows) which means that we need to convert both TIMESTAMP and TIMESTAMP_LTZ expressions into long values in the end.
By default, it is natural to convert data of type TIMESTAMP_LTZ to Long value. However, if it is a TIMESTAMP type, we will implicitly convert it to TIMESTAMP_LTZ and then convert it to a Long value.
2. Supported Expression Syntax
<TIMESTAMP EXPRESSION> must be a valid and executable expression.
For example, we will support TIMESTAMP '2023-05-05 00:00:00', but not support TIMESTAMP_LTZ '2023-05-05 00:00:00'. If you need to specify the TIMESTAMP_LTZ type, you need to use TO_TIMESTAMP_LTZ function.
Flink SQL related Changes
Here is the poc of the FLIP: https://github.com/apache/flink/compare/master...hackergin:flink:FLIP-308
I will describe the necessary modifications from several stages of SQL compilation and parsing.
1. Convert SQL String to SqlNode and Validate
At this stage, the schema of the table will be verified, so when parsing the SqlSnapshot node, the corresponding `peroid` (maybe a time constant or an expression) can be converted into a timestamp, and a LongSchemaVersion is constructed to generate a new The calcite schema for schema validate.
2. SqlNode To RelNode.
At this stage, the SQL schema will also be verified, so when converting the SqlSnapshot node, the timestamp of the obtained SqlSnapshot can be parsed, and the LongSchemaVersion will be constructed to generate a new Calcite Schema for table validation.
At present, Flink SQL only supports temporol join, so related detection items are added to avoid the use of unsupported syntax. The current implementation of time travel should remove these restrictions
4. Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation)
If a connector needs to support TimeTravel, the corresponding timestamp can be obtained through the
getSnapshot interface of CatlogTable when constructing a DynamicTableSource
Integrate with other systems
This section will describe how other systems can integrate to support TimeTravel.
Taking the support of Paimon for Timetravel as an example, when implementing the getTable(ObjectPath tablepath, long timestamp) method, we will specify the corresponding consumption timestamp.
Compatibility, Deprecation, and Migration Plan
This is a newly added feature, so there will be no compatibility issues
Whether to support other syntax implementations
Due to the current limitations of Calcite syntax, we can only support specifying a single time and cannot specify a time range.
Currently , we can't support the following synax.
TimeTravel is not only an ability of Source connector, but also needs support from Catalog. Travel not only requires obtaining data at the corresponding time point, but also requires the corresponding Schema at that time point.
As a Source ability, SupportTimeTravel will be applied during SQL optimization, which means that it can't meet the requirement for the TableSchema TimeTravel.
Add SupportsTimeTravel source ability
Here is the brief implement plan.
1. Add the Source Ability interface to support TimeTravel systems in implementing TimeTravel capabilities.
2. Add corresponding table optimization rules so that during LogicalPlan optimization, the Snapshot interval can be pushed up to the ScanTableSource.
3. To avoid conflicts with Temporal Join, we should apply TimeTravel transformation rules before apply the Temporal Join transformation rules.