DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Streaming SQL and Flink SQL has already gone through several iterations.
This FLIP aims to further fill missing gaps and improve syntax and terminology around the primitives for dynamic tables.
Note:
This FLIP does neither aim to fix all weak points nor suggest a complete overhaul of Flink SQL.
It only proposes another step in the language evolution by leveraging the newly introduced PTF feature in FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF) and more insights into Polymorphic Table Functions in the SQL standard.
Historical Context
For context, here are some major design documents that have shaped Flink SQL as we know it today. Major definitions and introduction of concepts are highlighted.
Dynamic Tables
- A database table results from a stream of INSERT, UPDATE, and DELETE DML statements, often called changelog stream.
- A materialized view is defined as a SQL query. To update the view, queries must continuously process the changelog streams of the view’s base relations.
- Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a Continuous Query.
One SQL to Rule Them All: An Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables
https://arxiv.org/pdf/1905.12133
- Introduction of Time-varying relations
- Defines traditional SQL as “Queries are on table snapshots“
- Materialized Views query pointwise over a TVR.
FLIP-132: Temporal Table DDL and Temporal Table Join
- Temporal Table represents a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time.
- A temporal table contains a set of versioned table snapshots, it can be a changing history table which tracks the changes (e.g. database changelog) or a changing dimensioned table which materializes the changes (e.g. database table).
- Versioned table: If the row in dynamic table can track its history changes and visit its history versions, we called this kind of dynamic table as versioned table.
- Regular table: For regular table, the row in dynamic table can only track its latest version. The table in lookup join can only track its latest version it's also a regular table.
- The most common scenario for temporal table is used in correlate to visit specified version data in temporal table join
-- Event-time temporal table join SELECT o.oder_id, o.order_time, o.amount * r.rate AS amount, r.currency FROM orders AS o, versioned_rates FOR SYSTEM_TIME AS OF o.order_time r on o.currency = r.currency; -- Processing-time temporal table join SELECT o.oder_id, o.proctime, o.amount * r.rate AS amount, r.currency FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.proctime r on o.currency = r.currency
FLIP-308: Support Time Travel
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel
Time travel is a SQL syntax for querying historical versions of data that allows users to specify a point in time and retrieve the data and schema of a table as it appeared at that time.
Users can easily analyze and compare historical versions of data.
- Query the data for a past moment:
SELECT * FROM table_name FOR SYSTEM_TIME AS OF TIMESTAMP '[timestamp]'SELECT * FROM table_name FOR SYSTEM_TIME AS OF [expression]
- Query the data for the current moment:
SELECT * FROM table_name FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP
Areas for improvement
Flink SQL is already very powerful. However, there are some weak points in the design that this FLIP aims to tackle:
Point in time results
Executing a
SELECT * FROM t;against a database results in a snapshot (i.e. point-in-time) result.However, Flink SQL subscribes to
tand consumes and applies its changelog. Thus, it implicitly triggers a materialized view maintenance.- Currently, a user needs to run
SELECT * FROM t FOR SYSTEM_TIME AS OF NOW()in order to get database-like semantics. Flink SQL is not standard compliant in this regard.
Interpretation of NOW()
In FLIP-162, the community agreed that NOW(), CURRENT_TIMESTAMP() and other time functions are evaluated per-row in streaming mode. They are not locked at query planning time:
”Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for row level in Streaming mode, evaluates the time function value at query start for batch mode.”Flink SQL is not standard compliant in this regard.
- Thus, time travelling (FLIP-308) via
SELECT * FROM t FOR SYSTEM_TIME AS OF NOW()and executingSELECT NOW() FROM (VALUES 1, 2, 3)seem to contradict each other. Esp. in a join condition, it is not obvious that the left NOW() is per-row but the right NOW() evaluated once per query:
SELECT *, NOW() FROM t1, t2 FOR SYSTEM_TIME AS OF NOW();
Overloading of FOR SYSTEM_TIME AS OF
Currently, FOR SYSTEM_TIME AS OF is heavily overloaded:
-- time travel SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF NOW(); SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF TIMESTAMP '2012-12-12 12:34:56'; -- time-versioned tables and temporal joins SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF t1.rowtime; -- not temporal joins, but lookup join SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF t1.proctime; -- lookup join using the confusing PROCTIME() function SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF PROCTIME();
Explaining the different semantics is difficult while using the same syntax
FOR SYSTEM_TIME AS OFwith slightly different parameters.The difference between lookup joins and temporal joins is not straight-forward and requires knowledge about the underlying data source of
t2in this example. Ift2is not lookup source, the query will fail during planning.The SQL standard only allows
NOW()or constant expressions likeTIMESTAMP '2012-12-12 12:34:56'inFOR SYSTEM_TIME AS OF. Thus, Flink SQL is not standard compliant for correlating timet1.rowtimeort1.proctime.
Working with Changelogs
All early documents mention the concept of a changelog and changelog to table conversion.
However, there are not built-in functions to convert from or to a changelog.
Also there are no functions that combine the concept of a changelog with the primitives around point-in-time results, time travel, time versioning.
Public Interfaces
SNAPSHOT() SEARCH_KEY() TO_CHANGELOG() FROM_CHANGELOG()
Proposed Changes
This FLIP proposes:
Introducing built-in PTFs that allow accessing a table
tfrom different angles.Evolving FLIP-440 by the concept of Correlated PTFs.
Considering upcoming FLIPs for machine learning that not only require lookups by key but also by vector search or full text search.
Related literature:
https://cloud.google.com/vertex-ai/docs/vector-search/overview
https://learn.microsoft.com/en-us/sql/relational-databases/search/full-text-search?view=sql-server-ver16
Overview
Requirement | Proposed syntax | Existing syntax |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
| missing |
|
| missing |
SNAPSHOT for point-in-time
Example:
-- Most compact invocation SELECT * FROM SNAPSHOT(t); -- Usage in joins SELECT * FROM SNAPSHOT(t1) AS a JOIN SNAPSHOT(t2) AS b ON a.k = b.k; -- Usage with named arguments SELECT * FROM SNAPSHOT(input => t);
Notes:
Semantically, this syntax is equal to the current implementation of
FOR SYSTEM_TIME AS OF NOW()introduced by FLIP-308: Depending on the catalog, a bounded table is returned.It avoids the problematic use of NOW() as explained above.
It is less verbose and easy to teach.
From a syntax perspective, a function allows for more parameterization in the future. Instead of relying the complex syntax of SQL hints.
Syntax is standard compliant and can be remodeled in every database that supports PTFs.
SNAPSHOT for time travel
Example:
-- Most compact invocation SELECT * FROM SNAPSHOT(t, TIMESTAMP '2024-01-20'); -- Usage with named arguments SELECT * FROM SNAPSHOT(input => t, to_time => TIMESTAMP '2024-01-20');
Notes:
Semantically, this syntax is equal to the current implementation of
FOR SYSTEM_TIME AS OF TIMESTAMP 'x'introduced by FLIP-308: Depending on the catalog, a bounded table is returned.However, we only allow deterministic function calls in arguments.
It avoids the problematic use of NOW() arithmetics as explained above.
It is less verbose and easy to teach.
From a syntax perspective, a function allows for more parameterization in the future. Instead of relying the complex syntax of SQL hints.
E.g. we could introduce a
from_timeargument as well making it:
SNAPSHOT(input => t, from_time => TIMESTAMP '2024-01-20', to_time => TIMESTAMP '2024-01-20')
Enabling time traveling not only on the upper bound but also on the lower bound.
More inspirations for argument e.g. here: https://docs.snowflake.com/en/sql-reference/constructs/at-before
Correlated SEARCH functions
Example:
-- Most common usage with a single key
SELECT * FROM t_other, LATERAL SEARCH_KEY(t, DESCRIPTOR(k), t_other.name);
-- Usage with named arguments
SELECT *
FROM
t_other,
LATERAL SEARCH_KEY(
input => t,
on_key => DESCRIPTOR(k),
lookup => t_other.name
)
-- Usage with configuration options
SELECT *
FROM
t_other,
LATERAL SEARCH_KEY(
input => t,
on_key => DESCRIPTOR(k),
lookup => t_other.name,
options => MAP[
'async', 'true',
'retry-predicate', 'lookup_miss',
'retry-strategy', 'fixed_delay',
'fixed-delay'='10s'
]
)
-- Usage with composite keys
-- Values are passed as a ROW type using parenthesis.
SELECT *
FROM
t_other,
LATERAL SEARCH_KEY(
input => t,
on_key => DESCRIPTOR(k1, k2),
lookup => (t_other.k1, t_other.k2)
);
Notes:
Semantically, this syntax is equal to the current implementation of
FOR SYSTEM_TIME AS OF PROCTIME()/t.proctimeIncluding any
LOOKUP(…)hints:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#lookupIt is less verbose and easy to teach.
The name
SEARCHis chosen as a preparation for future FLIPs around machine learning topics likeSEARCH_VECTOR(t, …)orSEARCH_TEXT(t, …)with will require different parameters and configuration options.The
LATERALintroduces the concept of correlated PTFs which not only take tables/models as argument but also correlated columns from the main input. A nice explanation can be found here:
https://stackoverflow.com/questions/28550679/what-is-the-difference-between-lateral-join-and-a-subquery-in-postgresql
In that sense, the LATERAL can be viewed as a foreach loop that re-executes the PTF for every row on the left side.Correlated PTFs are not supported in user-defined functions. This FLIP only proposes them for semantics of built-in PTFs.
SNAPSHOT for temporal joins
Example:
-- Most compact invocation SELECT * FROM t_other, LATERAL SNAPSHOT(t, t_other.rowtime); -- Usage with named arguments SELECT * FROM t_other, LATERAL SNAPSHOT(input => t, to_time => t_other.rowtime);
Notes:
Semantically, this syntax is equal to the current implementation of
FOR SYSTEM_TIME AS OF t_other.rowtimeintroduced by FLIP-132: Temporal Table DDL and Temporal Table Join.An uncorrelated call to SNAPSHOT results in a one time snapshot. Thus,
SELECT * FROM t_other, LATERAL SNAPSHOT(t)is semantically equal toSELECT * FROM t_other, SNAPSHOT(t)
TO_CHANGELOG/FROM_CHANGELOG for stream/table duality
Example:
SELECT * FROM TO_CHANGELOG(t) AS s; SELECT * FROM FROM_CHANGELOG(s) AS t;
Notes:
Semantically, this syntax is equal to the current implementation of
toChangelogStream(Table)andfromChangelogStream(DataStream<Row>).By default, both functions work with retract update encoding which is the most flexible/most generic changelog mode.
From a syntax perspective, a function allows for more parameterization in the future.
E.g. we could support upsert via a parameter
on_key(naming similar toSEARCH_KEY) such as
SELECT * FROM TO_CHANGELOG(input => t, on_key => DESCRIPTOR(k));
Once Flink SQL supports lambda expressions, we could even pass a watermark strategy together with a
on_time(naming similar to FLIP-440) inFROM_CHANGELOGand thus remodel aCREATE TABLEstatement. But this is not part of this FLIP.The schema of
TO_CHANGELOGandFROM_CHANGELOGmust be symmetric and is defined as:
Column | Data Type | Value |
|---|---|---|
op | TINYINT | Similar naming and output as a Table API .print(). |
payload | ROW<…> | Columns of t. |
The payload column is a nested column for quickly filtering by op and selecting the payload like:
SELECT payload.* FROM TO_CHANGELOG(t) WHERE op = 0Implementation Details
Most of the proposed changes are syntax changes. The user-defined PTF stack won't be sufficient for complex input strategies with varargs and overloading. We will need special rules and custom Calcite SqlOperators.
Compatibility, Deprecation, and Migration Plan
Old syntax will be deprecated but stays usable at least until the next major Flink version.
Test Plan
Plan and semantic tests will be added.
Rejected Alternatives
- Introduction of new keywords:
→ would not be standard compliant and hard to implement (i.e. Calcite changes in SELECT) - Mixture of PTFs and WHERE clause
→ would not be standard compliant