Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

Code Block
languagesql
CREATE DYNAMIC TABLE dt_sink

...



PARTITIONED BY (ds)

...



WITH(

...



  --hint time partition format

...



  'partition.fields.ds.time-formatter' = 'yyyy-MM-dd'

...



)

...



FRESHNESS = INTERVAL '1' DAY

...



AS SELECT 

...



  ds

...



  id,

...



  order_number,

...



  user_id,

...



...

...



FROM source

In the given example, both the 'source' and 'dt_sink' tables are partitioned by a 'ds' field. By specifying the format of the time partition field in the WITH clause, the framework is prompted to only refresh the data in the latest 'ds' partition with each batch schedule.

Given the scenario where the freshness is set to '1' DAY, meaning that the scheduling occurs once a day. Assuming today's date is March 2, 2024, when the batch scheduling is triggered at midnight, the framework will use the time passed by the scheduling system, combined with freshness and 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this case, it will determine that 'ds=2024-03-01' which corresponds to yesterday's partition. The framework then assembles the following SQL Statement to submit a Fink batch job.

Code Block
languagesql
INSERT OVERWRITE dt_sink

...


SELECT

...

  (SELECT 

  ds

  id,

  order_number,

  user_id,

...

  FROM source) AS tmp

– add partition filter condition

...

 * FROM 
(SELECT 
  	ds
  	id,
	order_number,
	user_id,
	...
  FROM source
) AS tmp
– add partition filter condition
WHERE ds = '2024-03-01'

Since both the 'source' and 'dt_sink' tables are partitioned by the 'ds' field, the optimizer, when generating the execution plan, will push down the 'ds' filter condition to the 'source' table, thereby only reading data from the corresponding partition of the 'source' table.

...