Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Manually trigger a dynamic table refresh, only works for the current table, doesn't trigger a cascade refresh of all downstream tables defaultly.

Note: If Manual Refresh command and the background refresh job can run in parallel, the framework does not impose any limitations on this. Of course, if the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

...

Regarding the details of interfacing with various open source schedulers, we will discuss the relevant interfaces involved in a separate flipFLIP. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

...

  1. Dynamic Table is a new conceptual entity and table type that also includes attributes such as query, freshness and refresh job, which makes it more appropriate to use the new interface.
  2. If CatalogTable adds a new attribute that is not supported by Dynamic Table, it will lead to a conflict, so it is more appropriate to represent it using an interface that is parallel to CatalogTable.

Reference

Future Improvements

Temporary View Limitation

Since both the dynamic table and the query it references need to be persistent, and the temporary view is executed locally and only effective for the current execution, it cannot be persisted. Therefore, it is not supported. If there is a real need to use temporary view, there are currently two solutions. One is to use a View, and the other is to use a Common Table Expression (CTE)[7], which is equivalent in effect to a temporary view. Of course, future support for temporary view can be determined based on user demand and in conjunction with technical implementation.

Code Block
languagesql
CREATE DYNAMIC TABLE sink
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS
-- use cte
WITH cte_source AS (
  SELECT 
  user_id,
  shop_id,
  FROM_UNIXTIME(ds, 'yyyy-MM-dd') AS ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM c_source
GROUP BY user_id, shop_id, FROM_UNIXTIME(ds, 'yyyy-MM-dd')
)
SELECT * FROM cte_source

Modify Select Statement Limitation

From the standpoint of strict semantics, the current design of the Dynamic Table does not support altering the query statement. In the context of a lakehouse, a common scenario involves the need to add fields due to schema evolution, which is a legitimate requirement that we intend to support in the future. One possible solution we have conceived is to use the syntax Alter Dynamic Table xxx ADD query_statement to provide this support. At the framework level, we would optimize by merging multiple query statements that write to the same Dynamic Table.

Code Block
languagesql
-- create a dynamic table with two agg indices
CREATE DYNAMIC TABLE dws_deals(
	PRIMARY KEY (order_id, ds) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
  o.ds
  o.order_id,
  count(*) order_cnt,
  sum(pay_amt) order_total_amt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;

-- add two new agg indices statement
ALTER DYNAMIC TABLE dws_deals
ADD 
SELECT
  o.ds
  o.order_id,
  count(distinct order_id) dedup_order_cnt, -- deduplicated order cnt
  sum(goods_cnt) tatol_goods_cnt -- total goods cnt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;


Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables

...

[6] PoC: https://github.com/lsyldliu/flink/tree/dynamic_table_poc

[7] https://www.postgresql.org/docs/16/queries-with.html