Discussion thread

Vote thread

JIRA: FLINK-31791 - Getting issue details... STATUS  

Released: 1.18.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The main purpose of this FLIP is to support operator-level state TTL configuration for Table API & SQL programs via compiled JSON plan. We will explain the benefits of this feature and why we propose to do so.

Nowadays, the Table & SQL API is prevailing for expressing pipelines that perform stateful stream processing, and the state TTL for all stateful operators is configured at the job level granularity by table configuration 'table.exec.state.ttl'.  Nevertheless, there are some situations where the requirement for state retention is different within the same job, such as order logistics tracking, and cross-day cumulative metrics computation. By fine-grained TTL configuration to control state retention, users can reduce the stage usage to their needs.

To name a few scenarios

By now, we have explained the reasons why we believe this feature is valuable. Next, let's explain the current job-level state TTL mechanism and how we plan to implement the operator-level configuration.

Explanation of the Current Mechanism

By default, the SQL text is first parsed to operations and optimized to exec graph, and then transformed to transformation. 

During the transformation, for the exec node that creates the stateful operator and uses TTL to control retention time, the 'table.exec.state.ttl' is retrieved from the config and passed to the operator as a long value with ms as the time unit (see ExecNode#translateToPlanInternal). 

Theoretically, it is possible to achieve fine-grained control of the state TTL by having each operator receive a different value in this process.

Terminology Verification

We would like to clarify the term "state TTL at operator level" or "state TTL per operator" before going ahead, because the underlying implementation for the stateful operator may contain multiple states (i.e. 1-v-N relationship).


  • For the retract rank with a constant rank range, KeyedProcessOperator with RetractableTopNFunction uses a MapState to store the mapping from the sort key to the record list and a ValueState to store the mapping from the sort key to the record count.
  • For a regular stream join, StreamingJoinOperator uses two JoinRecordStateView to store the join state from the left and right sides.

As a corollary, we propose to define the operator's TTL configuration granularity to be tied to the state of each input of the operator, which does not expose the internal state implementation to users, illustrated in Fig1. 

Fig.1 Illustration for configuration granularity for stateful operator

On the other hand, some stateful operators don't rely on the StateTtlConfig to control state retention. E.g. window-based aggregation and interval/window join operation are excluded from the discussion scope of this FLIP.

Important Note

  1. The current state TTL is based on the initialization time of each operator, which is inherently unaligned. This means under some rare conditions where the data did not expire in the upstream operator, but expired in the downstream operator, resulting in abnormal calculation results or direct exceptions thrown by the operator. The probability of such unalignment is magnified now that fine-grained operator-level TTL is supported. While on the other hand, this FLIP is not the root cause of this issue. To systematically solve the problem of TTL unalignment between operators,  we need a larger FLIP to accomplish this.
  2. This is a feature for advanced users and should be used with caution.

Explanation of the Proposed Design

Firstly, we evaluated and rejected  

  • Use SQL hints /*+ OPTIONS('table.exec.state.ttl' = '...') */,  the reason is explained in the Rejected Alternatives part.

  • Introduce new configuration keys. We don't want to increase the burden of usage.

Meanwhile, we noticed that FLIP-190: Support Version Upgrades for Table API & SQL Programs has exposed the JSON plan concept with SQL syntax COMPILE [AND EXECUTE] PLAN '${plan_file}' FOR <insert_query> to users. While it was originally intended to ensure Table API & SQL job's backward compatibility, it also decouples the pipeline compiling from running, and provides more flexibility and configurability for job recovery via JSON files.

The compiled plan has a good shape to express the topology and an adequate description of the generated ExecNode. Currently, the JSON file includes the exec node's type (such as table scan/sink or calc, etc.), input properties, and output type, etc. It's natural for us to consider including the state TTL configuration in the plan as well, to achieve the goal that passing different TTL values to the different stateful operators.

The simplest way is to let 'table.exec.state.ttl' as consumed options for the exec node, which generates stateful operators. However, this is not our approach for the following reasons.

  1. This breaks the design purpose for FLIP-190 since consumed options should only persist the configurations which may affect topology. There is also a hotfix to exclude this configuration from being persisted, and there is no reason to add it back.

  2. This approach only works for OneInputStreamOperator.

As a result, we propose to persist the state TTL configuration to the compiled JSON file with a more generalized structure.


For ExecNodes which generate stateful OneInputStreamOperator

  "id": 4,
  "type": "stream-exec-deduplicate_1",
  "configuration": {...},
  "uniqueKeys": [...],
  "inputProperties": [...],
  "outputType": {...},
  "description": "Deduplicate(keep=[FirstRow], key=[order_id], order=[PROCTIME])",
+  "state": [
+    {
+      "index": 0,
+      "ttl": "3600000 ms",
+      "name": "deduplicateState"
+    }
+  ]

For ExecNodes which generate stateful TwoInputStreamOperator

  "id": 7,
  "type": "stream-exec-join_1",
  "joinSpec": {
    "joinType": "INNER",
    "leftKeys": [...],
    "rightKeys": [...],
    "filterNulls": [...],
    "nonEquiCondition": null
  "leftUpsertKeys": [...],
  "rightUpsertKeys": [...],
  "inputProperties": [...],
  "outputType": "...",
  "description": "Join(joinType=[InnerJoin], where=[...], select=[...], leftInputSpec=[...], rightInputSpec=[...])",
+  "state": [
+    {
+      "index": 0,
+      "ttl": "259200000 ms",
+      "name": "join-lef-state"
+    },
+    {
+      "index": 1,
+      "ttl": "86400000 ms",
+      "name": "join-right-state"
+    }
+  ]

For ExecNodes which generate stateful MultipleInputStreamOperator

  "id": 10,
  "type": "stream-exec-multi-xx_1",
  "description": "xxx"
+  "state": [
+    {
+      "index": 0,
+      "ttl": "259200000 ms",
+      "name": "${state-name-0}"
+    },
+    {
+      "index": 1,
+      "ttl": "86400000 ms",
+      "name": "${state-name-1}"
+    },
+    {
+      "index": 2,
+      "ttl": "86400000 ms",
+      "name": "${state-name-2}"
+    },
+    ...
+  ]

Public Interfaces

The proposed changes only introduce classes annotated with @Internal.

 * It is used to describe the state metadata of a stateful operator, which is
 * serialized/deserialized into/from those {@link
 *}s that can generate stateful
 * operators. For ExecNodes that generates {@link
 * org.apache.flink.streaming.api.operators.TwoInputStreamOperator} or {@link
 * org.apache.flink.streaming.api.operators.MultipleInputStreamOperator}, there will be multiple
 * metadata describing information about each input's state.
 * <p>The metadata describes the following attributes.
 * <ul>
 *   <li>{@code stateIndex}: annotates the state is from the i-th input, based on zero
 *   <li>{@code ttl}: annotates the state retention time for the i-th input's state
, the time unit is ms
 *   <li>{@code name}: annotates the state, such as deduplicate-state, join-left-state.
 * </ul>
@JsonIgnoreProperties(ignoreUnknown = true)
public class StateMetadata {

    public static final String FIELD_NAME_STATE_INDEX = "index";
    public static final String FIELD_NAME_STATE_TTL = "ttl";
    public static final String FIELD_NAME_STATE_NAME = "name";

    @JsonProperty(value = FIELD_NAME_STATE_INDEX, index = 0)
    private final int stateIndex;
    @JsonProperty(value = FIELD_NAME_STATE_TTL, index = 1)
    private final long stateTtl;

    @JsonProperty(value = FIELD_NAME_STATE_NAME, index = 2)
    private final String stateName;

    public StateMetadata(
            @JsonProperty(FIELD_NAME_STATE_INDEX) int stateIndex,
            @JsonProperty(FIELD_NAME_STATE_TTL) long stateTtl,
            @JsonProperty(FIELD_NAME_STATE_NAME) String stateName) {
        this.stateIndex = stateIndex;
        this.stateTtl = stateTtl;
        this.stateName = stateName;
    public boolean equals(Object o) {

    public int hashCode() {

Note: state index annotates the index of the operator's input.

Proposed Changes


  • Supplement to FLIP-190
    • Extend reading from/writing to a local file to Flink's FileSystem and support the SQL gateway usage
    • Implement the syntax "EXPLAIN PLAN 'plan.json'" syntax, which is proposed in FLIP-190; the Table API has been implemented
  • Introduce StateMetadata serialization/deserialization to the current compiled plan. StateMetadata will be the member variable to all ExecNodes that translate to TTL-sensitive stateful operators.

Detailed Explanation

At present, COMPILE/EXECUTE PLAN FOR '${plan.json}' only supports writing to/reading from a local file without the scheme. We propose to extend the support for Flink's FileSystem.

-- before
COMPILE PLAN FOR '/tmp/foo/bar.json' <insert_query>
EXECUTE PLAN FOR '/tmp/foo/bar.json'

-- after
COMPILE PLAN FOR 'file:///tmp/foo/bar.json' <insert_query>
COMPILE PLAN FOR 'hdfs:///tmp/foo/bar.json' <insert_query>
COMPILE PLAN FOR 's3:///tmp/foo/bar.json' <insert_query>
COMPILE PLAN FOR 'oss:///tmp/foo/bar.json' <insert_query>

EXECUTE PLAN FOR 'file:///tmp/foo/bar.json'
EXECUTE PLAN FOR 'hdfs:///tmp/foo/bar.json'
EXECUTE PLAN FOR 's3:///tmp/foo/bar.json'
EXECUTE PLAN FOR 'oss:///tmp/foo/bar.json'

In addition, since FLIP-275: Support Remote SQL Client Based on SQL Gateway supports SQL client connecting to the remote gateway, and to make the user story complete, the remote JSON plan should also be added to the resource management for the Gateway mode, similar to "ADD JAR" operation, auto-downloading when the session starts and cleaning it up when the session ends. 

Meanwhile, FLIP-190 has proposed EXPLAIN PLAN 'plan.json' and the corresponding Table API but lacks of syntax support. In order to let users understand the compiled plan easier and complete the user story, we propose adding the syntax support.

// allows to get insights (which insights need to be discussed)

// and basic validation of the plan (e.g. resolution of catalog objects)

EXPLAIN PLAN '/mydir/plan.json';

We propose to serialize/deserialize StateMetadata for those ExecNodes which generate stateful operators and use TTL to control retention time.

Applied ExecNodes with OneInputStreamOperator

  • StreamExecChangelogNormalize 
  • StreamExecDeduplicate 
  • StreamExecGlobalGroupAggregate 
  • StreamExecGroupAggregate 
  • StreamExecGroupTableAggregate (*): serialization/deserialization not supported yet
  • StreamExecIncrementalGroupAggregate 
  • StreamExecLimit 
  • StreamExecLookupJoin (*): will transform to the stateful KeyedProcessOperator when materialization is needed
  • StreamExecPythonGroupAggregate 
  • StreamExecRank
  • StreamExecSink(*): will add stateful SinkUpsertMaterializer if the sink's PK is different from the upsert key
  • StreamExecSortLimit

Applied ExecNodes with TwoInputStreamOperator

  • StreamExecJoin
  • StreamExecTemporalJoin 

At present, there is no ExecNode that translates to a MultipleInputStreamOperator.

The following snippet of code aims to illustrate the change made on StreamExecJoin

       name = "stream-exec-join",
       version = 1,
       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
       minPlanVersion = FlinkVersion.v1_15,
       minStateVersion = FlinkVersion.v1_15)

+       name = "stream-exec-join",
+       version = 2,
+       producedTransformations = StreamExecJoin.JOIN_TRANSFORMATION,
+       minPlanVersion = FlinkVersion.v1_18,
+       minStateVersion = FlinkVersion.v1_15)
public class StreamExecJoin extends ExecNodeBase<RowData>
       implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {

+  @Nullable
+  @JsonProperty(FIELD_NAME_STATE)
+  private final List<StateMetadata> stateMetadataList;

   public StreamExecJoin(
           ReadableConfig tableConfig,
           JoinSpec joinSpec,
           List<int[]> leftUpsertKeys,
           List<int[]> rightUpsertKeys,
           InputProperty leftInputProperty,
           InputProperty rightInputProperty,
           RowType outputType,
           String description) {
               ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig),
               Lists.newArrayList(leftInputProperty, rightInputProperty),
+              StateMetadata.multiInputDefaultMeta(tableConfig, LEFT_STATE_NAME, RIGHT_STATE_NAME));

   public StreamExecJoin(
           @JsonProperty(FIELD_NAME_ID) int id,
           @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
           @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
           @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
           @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> leftUpsertKeys,
           @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> rightUpsertKeys,
           @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
           @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
           @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
+          @Nullable
+          @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> stateMetadataList) {
       super(id, context, persistedConfig, inputProperties, outputType, description);
       checkArgument(inputProperties.size() == 2);
       this.joinSpec = checkNotNull(joinSpec);
       this.leftUpsertKeys = leftUpsertKeys;
       this.rightUpsertKeys = rightUpsertKeys;
+      this.stateMetadataList = stateMetadataList;
   protected Transformation<RowData> translateToPlanInternal(
           PlannerBase planner, ExecNodeConfig config) {
       final ExecEdge leftInputEdge = …;
       final ExecEdge rightInputEdge = …;
       . . .

+      // for backward compatibility
+      long leftStateRetentionTime =
+              isNullOrEmpty(stateMetadataList)
+                      ? config.getStateRetentionTime()
+                      : stateMetadataList.get(0).getStateTtl();
+      long rightStateRetentionTime =
+              isNullOrEmpty(stateMetadataList)
+                      ? leftStateRetentionTime
+                      : stateMetadataList.get(1).getStateTtl();

       AbstractStreamingJoinOperator operator;
       FlinkJoinType joinType = joinSpec.getJoinType();
       if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI) {
           operator =
                   new StreamingSemiAntiJoinOperator(
                           joinType == FlinkJoinType.ANTI,
+                          rightStateRetentionTime);
       } else {

           operator =
                   new StreamingJoinOperator(
+                          rightStateRetentionTime);

      . . .
       return transform;

End-to-End Usage Example

Operator-level state TTL control to optimize state usage is an advanced feature. Not all SQL/Table API users have this requirement.

For the SQL jobs that

  • does not involve stateful computation
  • has only one stateful operator to compute data (set 'table.exec.state.ttl' is good enough)
  • does not need to enable fine-grained TTL tune (set 'table.exec.state.ttl' is good enough)

the submission process is not affected.


Configure different state TTLs for the same table source during different transformation phases. We will use 1h for deduplication and 24h for group aggregate.

    order_id     STRING,
    user_id      STRING,
    product_id   STRING,
    num          BIGINT,
    proctime AS PROCTIME()
) WITH ('connector' = 'datagen');

    product_id   STRING,
    buy_uv       BIGINT,
    order_cnt    BIGINT,
    quantity_cnt BIGINT
) WITH ('connector' = 'blackhole');

foo/bar/example.json' FOR 
SELECT product_id, 
       COUNT(DISTINCT user_id) AS buy_uv,
       COUNT(order_id) AS order_cnt,
       SUM(num) AS quantity_cnt
    SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS rn 
    FROM Orders
) tmp 
WHERE rn = 1
GROUP BY product_id;

This will generate the example.json under the directory /foo/bar. Users can change the TTL as needed and then submit the job via

Execute Plan
EXECUTE PLAN 'file:/foo/bar/example.json'


The following example illustrates the order logistic tracking scenario. It is often the case that the shipment status of the product changes over multiple days, and 'table.exec.state.ttl' is usually configured to the longest transition period.

With this FLIP, users can configure the left and right state retention time by modifying the serialized JSON file to reduce state usage.

SET 'table.exec.state.ttl' = '7d';

    gmt_create        TIMESTAMP(3),
    gmt_modified      TIMESTAMP(3),
    order_id     	  STRING NOT NULL,
) WITH ('connector' = '...');

CREATE TABLE Logistics (
    gmt_create        TIMESTAMP(3),
    gmt_modified      TIMESTAMP(3),
    order_line_id     STRING NOT NULL,
    order_id          STRING NOT NULL,
    product_id        STRING NOT NULL,
    shipment_status   STRING,
    PRIMARY KEY(order_id, order_line_id) NOT ENFORCED
) WITH ('connector' = '...');

CREATE TABLE EnrichedOrders (
    order_line_id      STRING NOT NULL,
    order_id           STRING NOT NULL,
    product_id         STRING NOT NULL,
    order_placed_ts    TIMESTAMP(3),
    order_shipped_ts   TIMESTAMP(3),
    PRIMARY KEY(order_line_id, order_id) NOT ENFORCED
) WITH ('connector' = '...');

SELECT gmt_modified AS order_shipped_ts,
FROM Logistics
WHERE shipment_status = 'SHIPPED';

SELECT tmp_b.order_line_id,
       tmp_a.gmt_create AS order_place_ts,
FROM Orders tmp_a JOIN order_shipped tmp_b ON tmp_a.order_id = tmp_b.order_id;

COMPIPE PLAN 'file:/foo/bar/join.json' FOR
INSERT INTO EnrichedOrders
SELECT * FROM enriched_view;

A snippet of the join node is illustrated as follows. 

  "id": 7,
  "type": "stream-exec-join_1",
  "joinSpec": {
    "joinType": "INNER",
    "leftKeys": [
    "rightKeys": [
    "filterNulls": [
    "nonEquiCondition": null
  "leftUpsertKeys": [
  "rightUpsertKeys": [
  "inputProperties": [
      "requiredDistribution": {
        "type": "UNKNOWN"
      "damBehavior": "PIPELINED",
      "priority": 0
      "requiredDistribution": {
        "type": "UNKNOWN"
      "damBehavior": "PIPELINED",
      "priority": 0
  "outputType": "ROW<`gmt_create` TIMESTAMP(3), `order_id` VARCHAR(2147483647) NOT NULL, `order_shipped_ts` TIMESTAMP(3), `order_line_id` VARCHAR(2147483647) NOT NULL, `order_id0` VARCHAR(2147483647) NOT NULL, `product_id` VARCHAR(2147483647) NOT NULL>",
  "description": "Join(joinType=[InnerJoin], where=[(order_id = order_id0)], select=[gmt_create, order_id, order_shipped_ts, order_line_id, order_id0, product_id], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])",
  "state": [
      "index": 0,
      "ttl": "604800000 ms",
      "name": "join-left-state"
      "index": 1,
      "ttl": "86400000 ms",
      "name": "join-right-state"

Compatibility, Deprecation, and Migration Plan

This proposal does not affect backward compatibility and savepoint compatibility, and thus the minStateVersion remains the same.

The implementation will make sure that

  • For those plans which are compiled before this FLIP, #translateToPlanInternal will let the state retention time still be retrieved from the table config, other than the state metadata.
  • For TwoInputStreamOperators like StreamJoinOperators, the newly introduced rightStateRetentionTime will be assigned to the value of leftStateRetentionTime for old plans.

According to FLIP-190

2. The JSON node of ExecNode A gets an additional property in Flink 1.16:


some-prop: 42


some-prop: 42,
some-flag: false

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2.

For the above-listed ExecNodes, we will propose adding a new ExecNodeMetadata.

@ExecNodeMetadata(name="${exec-node-name}", version=2, minPlanVersion=FlinkVersion.v1_18, minStateVersion=FlinkVersion.v1_15)

Test Plan

The test will focus on

  • Functionality Test
    • The operator-level state TTL configuration works as expected.
  • Compatibility Test
    • Old plans without state metadata can still be restored using the Flink version with this change.

Rejected Alternatives

SQL Hints

We reject using SQL hints to configure state TTL for the following reasons.

  • As described in hints, hints aim to provide statistics/metadata info to the optimizer to get better execution plans. From the semantic perspective, 'table.exec.state.ttl' is a configuration that influences the computation result. 

  • Under the hood, hints configure TTL from the SQL perspective and ultimately act on the individual operator(s) of the job. By using hints, we are impractically assuming that users understand what SQL relational algebraic expressions ultimately translate to what operators. 

  • Some stateful operators like ChangelogNormalize and SinkUpsertMaterializer are derived from the planner implicitly, and it's difficult to configure TTL for these operators using hints.

Add table.exec.state.ttl to consumed options

This does not work for TwoInputStreamOperator and MultipleInputStreamOperator. Besides, it breaks the design for FLIP-190.