Discussion thread
Vote thread

FLINK-14320 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


In Flink 1.9, we already introduced a basic SQL DDL to create a table. However, it doesn’t support to define time attributes in SQL DDL yet. Time attribute is a basic information required by time-based operations such as windows in both Table API and SQL. That means, currently, users can’t apply window operations on the tables created by DDL. Meanwhile, we received a lot of requirements from the community to define event time in DDL since 1.9 released. It will be a great benefit to support define time attribute in SQL.

Public Interfaces

This is a list of new interfaces in SQL DDL. Please refer to “Proposed Changes” section for details.

Watermark Syntax

CREATE TABLE table_name (
  WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

Proctime Attribute Syntax

CREATE TABLE table_name (
) WITH (

Proposed Changes

Flink Table API & SQL handles time attribute as a field in the schema, not a system or hidden field. This allows for unified syntax for table programs in both batch and streaming environments. The only difference between time attribute field and regular field is that we have a special mark on the field type. The proposed syntax also follows this concept.

Rowtime Attribute

A rowtime attribute must be an existing column in the table schema with TIMESTAMP type. An existing field will be marked as rowtime attribute once a watermark is defined on it. So what we need is a watermark definition to declare 1) the rowtime field, 2) the watermark strategy.

The following is the proposed watermark definition.

CREATE TABLE table_name (
  WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

We would like to treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY 

constraint and therefore keep it in the schema definition. 

The WATERMARK definition combines the watermark strategy and rowtime field selection (i.e. which existing field used to generate watermark) in one clause, so that we can define multiple watermarks in one table in the future.

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is marked as rowtime attribute, this field should already exist in the schema.  

The “AS” keyword defines watermark strategy. It allows arbitrary expression to calculate watermark. The returned value is the watermark. The return type can be a nullable BIGINT or TIMESTAMP(3). 

Watermarks are defined as longs that represent the milliseconds since the Epoch (midnight, January 1, 1970 UTC). The returned watermark will be emitted only if it is non-null and its timestamp is larger than that of the previously emitted watermark (to preserve the contract of ascending watermarks). The watermark generation expression is called by the system for every record. The system will periodically emit the largest generated watermark. If the current watermark is still identical to the previous one, or is null, or the timestamp of the returned watermark is smaller than that of the last emitted one, then no new watermark will be emitted.

The interval in which the watermark is emitted depends on ExecutionConfig#getAutoWatermarkInterval(). We will treat `autoWatermarkInterval==0` as the PUNCTUATED mode. In this mode, every generated watermark will be emitted only if it is not null and greater than the last emitted one. This allows us avoid introducting additional keyword to support AssignerWithPunctuatedWatermarks behavior.

For common cases, we can provide some suggested and easy-to-use ways to define commonly used watermark strategies. Such as:

  • Bounded Out of Orderness: rowtimeField - INTERVAL 'string' timeUnit
  • Preserve Watermark From Source: SYSTEM_WATERMARK()

Bounded Out of Orderness

A bounded out of orderness watermark strategy is used to emit Watermarks that lag behind the element with event-time timestamp by a fixed amount of time. 

In this case, the watermark can be generated by the timestamp of the element minus the specified late time. For example:

CREATE TABLE kafka_source (
  user_id STRING,
  log_ts TIMESTAMP(3),
  ingestion_ts TIMESTAMP(3),
) WITH (

It defines a watermark for log_ts rowtime field, and the watermark is generated as rowtime - 5 seconds.

Preserve Watermark From Source

The FROM-SOURCE watermark descriptor preserves the assigned watermarks from the underlying source implementation. In SQL DDL, the proposed grammar is using a built-in SYSTEM_WATERMARK() function:

  rowtime AS SYSTEM_ROWTIME(),
) WITH (

NOTE: If the watermark strategy is SYSTEM_WATERMARK(), then the corresponding rowtime field must be derived using SYSTEM_ROWTIME(). Otherwise an exception will be thrown by the framework.

The SYSTEM_ROWTIME() is a built-in function and can only be used in DDL as a computed column. It means it uses timestamp of the StreamRecord, which is generated by SourceContext#collectWithTimestamp(element, timestamp). 

The SYSTEM_WATERMARK() is a built-in function too, and can only be used as a watermark strategy expression. It means the watermark is generated by the source system itself and the framework should preserve the assigned watermark from source instead generate a new one. 

SYSTEM_ROWTIME() and SYSTEM_WATERMARK() correspond to Rowtime#timestampsFromSource() and Rowtime#watermarksFromSource() in descriptor.

So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise.

Complex watermark strategies

Users can also use a user defined ScalarFunction to support more complex watermark strategy. 

For example, assuming the log_ts field describes when the record was created, and some records carry a flag, marking them as the end of a sequence such that no elements with smaller timestamps can come anymore. Then we can implement a simple UDF to emit watermark depends on log_ts and flag fields.

  log_ts TIMESTAMP(3),
  flag BOOLEAN,
  WATERMARK FOR log_ts AS watermarkOnFlag(log_ts, flag)
) WITH (
public class WatermarkOnFlag extends ScalarFunction {
  Long eval(long rowtime, boolean isEndOfSequence) {
    return isEndOfSequence ? rowtime : null;

This makes defining watermarks as flexible as PunctuatedWatermarkAssigner in DataStream.

Limitation: For more advanced strategies, e.g. a histogram-based approach would need to remember the values of the last x records. The interface of a scalar function would still work for that, but it would be a stateful function (we don’t support stateful scalar function yet). That means we still can use scalar function to implement it but the next watermark maybe wrong after failover. This needs further discussions, if we want to support such advanced strategies. 

Dropping ASCENDING watermark strategy

A watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.

I would like to avoid introducing ASCENDING watermark strategy, because it is a confusing keyword. The “AS” part describes what is the watermark generation strategy, but what is Ascending? 

The ascending watermark? A watermark of course should be ascending. 

The ascending rowtime? Why a watermark strategy part need to describe rowtime character.

An ASCENDING watermark strategy is usually used in testing and can also be represented by “rowtimeField - ‘0.001’ SECOND”.

So I think we should be careful to add ASCENDING strategy.

Expected rowtime is nested in schema?

It’s fine if the expected rowtime field is nested, the watermark syntax allows to access a nested field using dot syntax. For example:

CREATE TABLE kafka_source (
  order_id STRING,
  event ROW<log_ts TIMESTAMP(3), message STRING, type INT>,
  WATERMARK FOR event.log_ts AS event.log_ts - '5' SECOND
) WITH (

Expected rowtime field does not exist in the schema?

It is a common case that the expected rowtime field needs to be derived from existing fields, e.g. the original timestamp is a string or a bigint value represents a UTC timestamp, or nested in a JSON. That’s why we provided TimestampExtractor in descriptors to extract timestamp.

In SQL DDL, we would like to leverage computed-column syntax to derive from existing fields using built-in functions or user defined functions. Computed column is a widely used feature in DataBase systems, e.g. MySQL, MS Server, Oracle.  With computed-column, it can make defining a rowtime attributes transparent and simple and can cover TimestampExtractor

For example:

CREATE TABLE kafka_source (
  order_id STRING,
  ts STRING,
  -- to_timestamp can be a built-in function or UDF
  rowtime AS to_timestamp(ts), 
  WATERMARK FOR rowtime AS rowtime - '5' SECOND
) WITH (

NOTE: the design of computed column is discussed in FLIP-70.

Rejeceted Alternative Syntax

1. Reject to introduce BOUNDED DELAY syntax in the first version.


The BOUNDED DELAY syntax can be substituted in expression way “rowtime - INTERVAL ‘string’ timeUnit”. We should be careful to add more keywords,  because they affect every query, not just DDL.

2. Reject to use PunctuatedWatermarkAssigner class name 'strategyClassName'

WATERMARK FOR columnName AS CUSTOM 'strategyClassName'
WATERMARK FOR columnName AS 'strategyClassName'

3. Reject to support an optional watermark name.

WATERMARK [watermarkName] FOR columnName AS ...

Watermark name is not necessary in the first version when we only have one watermark definition in DDL.

Proctime Attribute

The proctime attribute field definition can also be defined using computed-column. Such as:

pt as SYSTEM_PROCTIME() , which defines a proctime field named “pt” in the schema. Here SYSTEM_PROCTIME()is a built-in function to derive a proctime attribute field.

The same as above, the design of computed column is discussed in FLIP-70.


In this section, we will only discuss about the watermark implementation. The proctime attribute will be supported naturally when we support computed column. 

Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

The steps will be as following:

  1. Parse DDL into SqlCreateTable with time attributes information
  2. Convert SqlCreateTable to CatalogTable in SqlToOperationConverter#convert
    1. time attribute information will be placed in CatalogTable#getTableSchema, called WatermarkSpec
    2. convert the time attribute information into properties.
  3. Pass the time attribute information (named TimeAttributeDescriptor) into TableSourceTable when DatabaseCalciteSchema#convertCatalogTable
  4. Create TableScan node (and computed column Project) and WatermarkAssigner node when FlinkRelOptTable#toRel
  5. Add a Rule to combine WatermarkAssigner and potential rowtime computed column project into TableScan. (DEFAULT_REWRITE optimization phase)
  6. Apply rowtime computed column if existed and watermark assigner according to the TimeAttributeDescriptor when StreamExecTableSourceScan#translateToPlan. Ignore TableSource’s DefinedRowtimeAttributes if TimeAttributeDescriptor exists. 

The properties representention of watermark can not be the same with the properties of Rowtime descriptor. Because the watermark structure proposed by this design can't fully consume Rowtime descriptor properties.

So for a statement `WATERMARK FOR f1.q2 AS f1.q2 - INTERVAL '5' SECOND`, the proposed properties might be:

'schema.watermark.0.rowtime' = 'f1.q2'
'schema.watermark.0.strategy.expression' = '`f1`.`q2` - INTERVAL '5' SECOND'
'schema.watermark.0.strategy.datatype' = 'TIMESTAMP(3)'

The conversion between WatermarkSpec and properties is handled by framework, and users shouldn't care about it. 

The connector developer can get the watermark information from the `CatalogTable#getTableSchema#getWatermarkSpecs` from `TableSourceFactory#createTableSource(ObjectPath tablePath, CatalogTable table)`.

Actually, the connector developers don't need to get watermark information for now, because the watermark assigner will be applied by framework automatically.

Deprecate DefinedRowtimeAttributes

Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users. 

In terms of theory, TableSource shouldn’t care about the rowtime attribute and watermark information. TableSource should only care about the physical deserialization and network transmission. The planner should already know all the information about rowtime and watermark and can automatically apply watermark assigner after the source transformation. 

We can deprecate DefinedRowtimeAttributes once we don't support temporal TableSource anymore.

Compatibility, Deprecation, and Migration Plan

  • All existing DDL syntax are still valid and compatible and have no changed behavior.
  • No new interface introduced for TableSource or TableSink, so the interfaces are still compatible. 

Test Plan

The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.