Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Streaming SQL and Flink SQL has already gone through several iterations.

This FLIP aims to further fill missing gaps and improve syntax and terminology around the primitives for dynamic tables.

Note:
This FLIP does neither aim to fix all weak points nor suggest a complete overhaul of Flink SQL.
It only proposes another step in the language evolution by leveraging the newly introduced PTF feature in FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF) and more insights into Polymorphic Table Functions in the SQL standard.

Historical Context

For context, here are some major design documents that have shaped Flink SQL as we know it today. Major definitions and introduction of concepts are highlighted.

Dynamic Tables

https://nightlies.apache.org/flink/flink-docs-release-2.0-preview1/docs/dev/table/concepts/dynamic_tables/

  • A database table results from a stream of INSERT, UPDATE, and DELETE DML statements, often called changelog stream.
  • A materialized view is defined as a SQL query. To update the view, queries must continuously process the changelog streams of the view’s base relations.
  • Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a Continuous Query.

One SQL to Rule Them All: An Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables

https://arxiv.org/pdf/1905.12133

  • Introduction of Time-varying relations
  • Defines traditional SQL as “Queries are on table snapshots
  • Materialized Views query pointwise over a TVR.

FLIP-132: Temporal Table DDL and Temporal Table Join

https://cwiki.apache.org/confluence/display/FLINK/FLIP-132%3A+Temporal+Table+DDL+and+Temporal+Table+Join

  • Temporal Table represents a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time.
  • A temporal table contains a set of versioned table snapshots, it can be a changing history table which tracks the changes (e.g. database changelog) or a changing dimensioned table which materializes the changes (e.g. database table).
  • Versioned table: If the row in dynamic table can track its history changes and visit its history versions, we called this kind of dynamic table as versioned table.
  • Regular table: For regular table, the row in dynamic table can only track its latest version. The table in lookup join can only track its latest version it's also a regular table.
  • The most common scenario for temporal table is used in correlate to visit specified version data in temporal table join
-- Event-time temporal table join
SELECT 
  o.oder_id,
   o.order_time,
  o.amount * r.rate AS amount,
  r.currency
FROM orders AS o, versioned_rates FOR SYSTEM_TIME AS OF o.order_time r
on o.currency = r.currency;
-- Processing-time temporal table join 
SELECT 
  o.oder_id,
  o.proctime,
  o.amount * r.rate AS amount,
  r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.proctime r
on o.currency = r.currency

FLIP-308: Support Time Travel

https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel

  • Time travel is a SQL syntax for querying historical versions of data that allows users to specify a point in time and retrieve the data and schema of a table as it appeared at that time.

  • Users can easily analyze and compare historical versions of data.

  • Query the data for a past moment:

SELECT * FROM table_name FOR SYSTEM_TIME AS OF TIMESTAMP '[timestamp]'

SELECT * FROM table_name FOR SYSTEM_TIME AS OF [expression]

  • Query the data for the current moment:

SELECT * FROM table_name FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP

Areas for improvement

Flink SQL is already very powerful. However, there are some weak points in the design that this FLIP aims to tackle:

Point in time results

  • Executing a SELECT * FROM t; against a database results in a snapshot (i.e. point-in-time) result.

  • However, Flink SQL subscribes to t and consumes and applies its changelog. Thus, it implicitly triggers a materialized view maintenance.

  • Currently, a user needs to run SELECT * FROM t FOR SYSTEM_TIME AS OF NOW() in order to get database-like semantics.
  • Flink SQL is not standard compliant in this regard.

Interpretation of NOW()

  • In FLIP-162, the community agreed that NOW(), CURRENT_TIMESTAMP() and other time functions are evaluated per-row in streaming mode. They are not locked at query planning time:
    ”Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for row level in Streaming mode, evaluates the time function value at query start for batch mode.”

  • Flink SQL is not standard compliant in this regard.

  • Thus, time travelling (FLIP-308) via SELECT * FROM t FOR SYSTEM_TIME AS OF NOW() and executing SELECT NOW() FROM (VALUES 1, 2, 3) seem to contradict each other.
  • Esp. in a join condition, it is not obvious that the left NOW() is per-row but the right NOW() evaluated once per query:

SELECT *, NOW() FROM t1, t2 FOR SYSTEM_TIME AS OF NOW();

Overloading of FOR SYSTEM_TIME AS OF

  • Currently, FOR SYSTEM_TIME AS OF is heavily overloaded:

-- time travel
SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF NOW();
SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF TIMESTAMP '2012-12-12 12:34:56';

-- time-versioned tables and temporal joins
SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF t1.rowtime;

-- not temporal joins, but lookup join 
SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF t1.proctime;
-- lookup join using the confusing PROCTIME() function
SELECT * FROM t1, t2 FOR SYSTEM_TIME AS OF PROCTIME();
  • Explaining the different semantics is difficult while using the same syntax FOR SYSTEM_TIME AS OF with slightly different parameters.

  • The difference between lookup joins and temporal joins is not straight-forward and requires knowledge about the underlying data source of t2 in this example. If t2 is not lookup source, the query will fail during planning.

  • The SQL standard only allows NOW() or constant expressions like TIMESTAMP '2012-12-12 12:34:56' in FOR SYSTEM_TIME AS OF. Thus, Flink SQL is not standard compliant for correlating time t1.rowtime or t1.proctime.

Working with Changelogs

  • All early documents mention the concept of a changelog and changelog to table conversion.

  • However, there are not built-in functions to convert from or to a changelog.

  • Also there are no functions that combine the concept of a changelog with the primitives around point-in-time results, time travel, time versioning.

Public Interfaces


SNAPSHOT()
SEARCH_KEY()
TO_CHANGELOG()
FROM_CHANGELOG()

Proposed Changes

This FLIP proposes:

Overview

Requirement

Proposed syntax

Existing syntax

  • Scan the table once as of now

  • Snapshot at the current point or moment in time

  • Bounded version of an unbounded table backed by an unbounded changelog stream.

SNAPSHOT(t)

FOR SYSTEM_TIME AS OF NOW()

  • Scan the table once at a different time

  • Travel in time

  • Allow only deterministic arguments
    e.g. no NOW() nor NOW() - INTERVAL...

SNAPSHOT(t, TIMESTAMP 'x')


FOR SYSTEM_TIME AS OF TIMESTAMP 'x'

  • Search for a key in a table once

  • Use a correlated PTF that is semantically easier to explain than PROCTIME(), as LATERAL can be viewed as kind of a foreach loop that executes a function for every t1.name.

  • Later we might add SEARCH_VECTOR(t, …) or SEARCH_TEXT(t, …)

SELECT *
FROM
  t1,
  LATERAL SEARCH_KEY(t2, DESCRIPTOR(k), t1.name)

SELECT *
FROM
  t1,
  t2 FOR SYSTEM_TIME AS OF PROCTIME()
WHERE t1.name = t2.k;

  • Following the LATERAL approach for SEARCH, the SNAPSHOT function can be generalized to temporal joins.

  • Avoiding the need for FOR SYSTEM_TIME AS OF entirely

  • According to the standard FOR SYSTEM_TIME AS OF should not be correlated.

SELECT *
FROM
  t1,
  LATERAL SNAPSHOT(t2, t1.rowtime)

SELECT *
FROM
  t1,
  t2 FOR SYSTEM_TIME AS OF t1.rowtime;

  • View the table as a changelog
  • Allows for converting an updating table (retract or upsert) to an append table.

  • Note that this functionality is not new. It is already present in Flink APIs but not accessible to SQL users.

TO_CHANGELOG(t)


missing


  • View the changelog as table

  • Opens the engine for advanced CDC use cases without having to use the DataStream API or a custom connector.

  • Note that this functionality is not new. It is already present in Flink APIs but not accessible to SQL users.

FROM_CHANGELOG(t)


missing

SNAPSHOT for point-in-time

Example:

-- Most compact invocation
SELECT * FROM SNAPSHOT(t);

-- Usage in joins
SELECT * FROM SNAPSHOT(t1) AS a JOIN SNAPSHOT(t2) AS b ON a.k = b.k;

-- Usage with named arguments
SELECT * FROM SNAPSHOT(input => t);

Notes:

  • Semantically, this syntax is equal to the current implementation of FOR SYSTEM_TIME AS OF NOW() introduced by FLIP-308: Depending on the catalog, a bounded table is returned.

  • It avoids the problematic use of NOW() as explained above.

  • It is less verbose and easy to teach.

  • From a syntax perspective, a function allows for more parameterization in the future. Instead of relying the complex syntax of SQL hints.

  • Syntax is standard compliant and can be remodeled in every database that supports PTFs.

SNAPSHOT for time travel

Example:

-- Most compact invocation
SELECT * FROM SNAPSHOT(t, TIMESTAMP '2024-01-20');

-- Usage with named arguments
SELECT * FROM SNAPSHOT(input => t, to_time => TIMESTAMP '2024-01-20');

Notes:

  • Semantically, this syntax is equal to the current implementation of FOR SYSTEM_TIME AS OF TIMESTAMP 'x' introduced by FLIP-308: Depending on the catalog, a bounded table is returned.

  • However, we only allow deterministic function calls in arguments.

  • It avoids the problematic use of NOW() arithmetics as explained above.

  • It is less verbose and easy to teach.

  • From a syntax perspective, a function allows for more parameterization in the future. Instead of relying the complex syntax of SQL hints.

  • E.g. we could introduce a from_time argument as well making it:

SNAPSHOT(input => t, from_time => TIMESTAMP '2024-01-20', to_time => TIMESTAMP '2024-01-20')

Enabling time traveling not only on the upper bound but also on the lower bound.

More inspirations for argument e.g. here: https://docs.snowflake.com/en/sql-reference/constructs/at-before

Correlated SEARCH functions

Example:

-- Most common usage with a single key
SELECT * FROM t_other, LATERAL SEARCH_KEY(t, DESCRIPTOR(k), t_other.name);

-- Usage with named arguments
SELECT *
FROM
  t_other,
  LATERAL SEARCH_KEY(
    input => t,
    on_key => DESCRIPTOR(k),
    lookup => t_other.name
  )

-- Usage with configuration options    
SELECT *
FROM
  t_other,
  LATERAL SEARCH_KEY(
    input => t,
    on_key => DESCRIPTOR(k),
    lookup => t_other.name,
    options => MAP[
      'async', 'true',
      'retry-predicate', 'lookup_miss',
      'retry-strategy', 'fixed_delay',
      'fixed-delay'='10s'
    ]
  )
  
-- Usage with composite keys
-- Values are passed as a ROW type using parenthesis.
SELECT *
FROM 
  t_other,
  LATERAL SEARCH_KEY(
    input => t,
    on_key => DESCRIPTOR(k1, k2),
    lookup => (t_other.k1, t_other.k2)
  );

Notes:

SNAPSHOT for temporal joins

Example:

-- Most compact invocation
SELECT * FROM t_other, LATERAL SNAPSHOT(t, t_other.rowtime);

-- Usage with named arguments
SELECT * FROM t_other, LATERAL SNAPSHOT(input => t, to_time => t_other.rowtime);

Notes:

  • Semantically, this syntax is equal to the current implementation of FOR SYSTEM_TIME AS OF t_other.rowtime introduced by FLIP-132: Temporal Table DDL and Temporal Table Join.

  • An uncorrelated call to SNAPSHOT results in a one time snapshot. Thus, SELECT * FROM t_other, LATERAL SNAPSHOT(t) is semantically equal to SELECT * FROM t_other, SNAPSHOT(t)

TO_CHANGELOG/FROM_CHANGELOG for stream/table duality

Example:

SELECT * FROM TO_CHANGELOG(t) AS s;

SELECT * FROM FROM_CHANGELOG(s) AS t;

Notes:

  • Semantically, this syntax is equal to the current implementation of toChangelogStream(Table) and fromChangelogStream(DataStream<Row>).

  • By default, both functions work with retract update encoding which is the most flexible/most generic changelog mode.

  • From a syntax perspective, a function allows for more parameterization in the future.

  • E.g. we could support upsert via a parameter on_key (naming similar to SEARCH_KEY) such as

SELECT * FROM TO_CHANGELOG(input => t, on_key => DESCRIPTOR(k));

  • Once Flink SQL supports lambda expressions, we could even pass a watermark strategy together with a on_time (naming similar to FLIP-440) in FROM_CHANGELOG and thus remodel a CREATE TABLE statement. But this is not part of this FLIP.

  • The schema of TO_CHANGELOG and FROM_CHANGELOG must be symmetric and is defined as:

Column

Data Type

Value

op

TINYINT

Similar naming and output as a Table API .print().
Contains the RowKind.toByte() value.

payload

ROW<…>

Columns of t.

The payload column is a nested column for quickly filtering by op and selecting the payload like:

SELECT payload.* FROM TO_CHANGELOG(t) WHERE op = 0

Implementation Details

Most of the proposed changes are syntax changes. The user-defined PTF stack won't be sufficient for complex input strategies with varargs and overloading. We will need special rules and custom Calcite SqlOperators.

Compatibility, Deprecation, and Migration Plan

Old syntax will be deprecated but stays usable at least until the next major Flink version.

Test Plan

Plan and semantic tests will be added.

Rejected Alternatives

  • Introduction of new keywords:
    → would not be standard compliant and hard to implement (i.e. Calcite changes in SELECT)
  • Mixture of PTFs and WHERE clause
    → would not be standard compliant