Motivation

When users are trying to partial-update a downstream table from multiple source tables, usually they will write multiple `insert into ...` in sql. This will generate multiple datastreams in the running job and for downstream storage like datalakes, there will be concurrency issue for multiple writers ingesting data into one table at the same time. Users in this case may have to manually union all source tables in Flink SQL before sink to downstream table. The SQL here is hard to use and maintain. See the following example for details:

-- Source Input
source table1:id1(primary key),f1,f2,f3,null,null,null,null,null,null,... ...
source table2:id1(primary key),null,null,null,f4,f5,f6,null,null,null,... ...
source table3:id1(primary key),null,null,null,null,null,null,f7,f8,f9,... ... 
... ...
source table5:id1(primary key),... ...


-- Sink Output
sink table:id1,f1,f2,f3,f4,f5,f6,f7,f8,f9,... ...


-- Flink SQL  
INSERT INTO sink
SELECT  id,
        f1,
        f2,
        f3,
        f4,
        f5,
        f6,
        f7,
        f8,
        f9,
  ... ...
FROM (
    SELECT  id,
            f1,
            f2,
            f3,
            CAST(NULL as STRING) as f4,
            CAST(NULL as STRING) as f5,
            CAST(NULL as STRING) as f6,
            CAST(NULL as STRING) as f7,
            CAST(NULL as STRING) as f8,
            CAST(NULL as STRING) as f9,
      		... ...
    FROM  table1
    UNION ALL
    SELECT  id,
            CAST(NULL as STRING) as f1,
            CAST(NULL as STRING) as f2,
            CAST(NULL as STRING) as f3,
            f4,
            f5,
            f6,
            CAST(NULL as STRING) as f7,
            CAST(NULL as STRING) as f8,
            CAST(NULL as STRING) as f9,
      		... ...
    FROM  table2
   	UNION ALL
    SELECT  id,
            CAST(NULL as STRING) as f1,
            CAST(NULL as STRING) as f2,
            CAST(NULL as STRING) as f3,
            CAST(NULL as STRING) as f4,
            CAST(NULL as STRING) as f5,
            CAST(NULL as STRING) as f6,
            f7,
            f8,
            f9,
  	 		... ...
    FROM table3
    UNION ALL
   	SELECT
    		... ...
  	FROM table4 
    UNION ALL
   	SELECT
    		... ...
  	FROM table5 
)

When sink table has wide columns(>100)  and are updating from more than 10 source tables, this maintenance of this SQL is almost unacceptable.It would be better if the flink planer can reuse the sink nodes across multiple table sinks in this case. This would be a great usability improvement for users using partial-update features with datalake storages like Paimon.


Examples of Use

Users are partial-updating a downstream table using Flink SQL like this:

BEGIN STATEMENT SET;

INSERT INTO sink(pk, f1, f2, f3) 
SELECT 
	pk, 
	f1, 
	f2, 
	f3 
FROM table1;

INSERT INTO sink(pk, f4, f5, f6) 
SELECT 
	pk, 
	f4, 
	f5, 
	f6 
FROM table2;

INSERT INTO sink(pk, f7, f8, f9) 
SELECT 
	pk, 
	f7, 
	f8, 
	f9 
FROM table3;

... ...


INSERT INTO sink(pk, ... ...) 
SELECT 
	pk, 
	... ...
FROM table5;

END;


Before Sink Reuse

Multiple datastreams will write to the sink table concurrently. This can lead to performance fallbacks and consistency issues for downstream storages like datalakes (Paimon, Hudi, etc).

                                                                 (using Paimon Sink as an example)


After Sink Reuse

A united datastream will write to the sink table. No concerns for concurrency issues.

                                                                 (using Paimon Sink as an example)


Public Interfaces

OptimizerConfigOptions

Option Name

Default Value

Description

table.optimizer.reuse-sink-enabled

true

When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED is true.


Migrate TargetColumns From DynamicTableSink.Context to Connector abilities

This FLIP relies on sink to tell the planner if the target columns has been used in the sink. If this information has been applied to the sink, the planner should not reuse the sinks with different target columns. Currently, the target columns are being placed in the context of DynamicTableSink and are offered to sink by one side. Planner does not aware if the sink uses this information or not. In order to make the planner aware of the usage of target column writing, the target column writing should be migrated from sink context to connector abilities.


Deprecate getTagetColumns() in  DynamicTableSink.Context

    @PublicEvolving
    interface Context {

		... ...

        /**
         *
         * @deprecated use {@link
         *     org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting} instead.
         */         
		@Deprecated(since = "2.1")
        Optional<int[][]> getTargetColumns();
    }


Introduce new sink ability SupportsTargetColumnWriting

SupportsTargetColumnWriting:Interface for DynamicTableSinks that support target column writing.

package org.apache.flink.table.connector.sink.abilities;

/**
 * Interface for {@link DynamicTableSink}s that support target column writing.
 *
 * <p>The planner will parse target columns from the DML clause and call {@link
 * #applyTargetColumns(int[][])} to pass an array of column index paths to sink.
 *
 * <p>The array indices are 0-based and support composite columns within (possibly nested)
 * structures. This information comes from the column list of the DML clause, e.g., for a sink table
 * t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
 *
 * <ul>
 *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will provide
 *       {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' will not apply this
 *       ability.
 *   <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will be 'a, b.b1', and will
 *       provide {@code [[0], [1, 0]]}.
 * </ul>
 *
 * <p>Note: Planner will not apply this ability for the delete statement because it has no column
 * list.
 *
 * <p>A sink can use this information to perform target columns writing.
 *
 * <p>If this interface is implemented and {@link #applyTargetColumns(int[][])} returns true. The
 * planner will use this information for plan optimization such as sink reuse.
 */
@PublicEvolving
public interface SupportsTargetColumnWriting {

    /**
     * Provides an array of column index paths related to user specified target column list.
     *
     * <p>See the documentation of {@link SupportsTargetColumnWriting} for more information.
     *
     * @param targetColumns column index paths
     * @return true if the target columns are applied successfully, false otherwise.
     */
    boolean applyTargetColumns(int[][] targetColumns);
} 


TargetColumnWritingSpec: A sub-class of SinkAbilitySpec that supports TargetColumnWriting

package org.apache.flink.table.planner.plan.abilities.sink;


/**
 * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the writing target
 * column indices to/from JSON, but also can write the target columns for {@link
 * SupportsTargetColumnWriting}.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("TargetColumnWriting")
public class TargetColumnWritingSpec implements SinkAbilitySpec {
    public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";

    @JsonProperty(FIELD_NAME_TARGET_COLUMNS)
    private final int[][] targetColumns;

    @JsonCreator
    public TargetColumnWritingSpec(@JsonProperty(FIELD_NAME_TARGET_COLUMNS) int[][] targetColumns) {
        this.targetColumns = targetColumns;
    }

    @Override
    public void apply(DynamicTableSink tableSink) {
        if (tableSink instanceof SupportsTargetColumnWriting) {
            ((SupportsTargetColumnWriting) tableSink).applyTargetColumns(targetColumns);
        } else {
            throw new TableException(
                    String.format(
                            "%s does not support SupportsTargetColumnWriting.",
                            tableSink.getClass().getName()));
        }
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        TargetColumnWritingSpec that = (TargetColumnWritingSpec) o;
        return Objects.deepEquals(targetColumns, that.targetColumns);
    }

    @Override
    public int hashCode() {
        return Arrays.deepHashCode(targetColumns);
    }
}


Proposed Changes

Overview

First, parse the targetColumn information from SinkModifyOperation to the ability specs of sink relnodes

Second, introduce a new SinkReuser  in SubplanReuser  to reuse duplicated sink relnodes with following behaviors:

  1. Iterate over all relNodes and generate digest for each sink node
  2. Group reusable sink nodes as a ReusableSinkGroup if two sink nodes have the same digest, sinkAbilitySpecs and inputTraitSet
  3. Create a union node by the inputs of sink nodes in a ReusableSinkGroup  and place this union node in front of sink node
  4. Remove all unused sink nodes and left only the reused sink in a ReusableSinkGroup
  5. Return a new list of relNodes


Sink Digest

The key of sink reuse is to judge if different sink nodes are equals. This can be achieved by extract digest information from the sink node.


Factors considered for sink node digest:

  1. sink table identifier
  2. sink table schemas
  3. sink table hints
  4. sink table upsertMaterialize option for StreamingMode
  5. sink target columns
    1. target columns will be removed from the digest after the getTargetColumns api is removed from the context


ReusableSinkGroup

Sinks that are reusable should be grouped into a ReusableSinkGroup. Sinks in a ReusableSinkGroup share the same digest, sinkAbilitySpecs and inputTraitSet.

public class ReusableSinkGroup {

    private final List<Sink> originalSinks = new ArrayList<>();
    private final SinkAbilitySpec[] sinkAbilitySpecs;
    private final RelTraitSet inputTraitSet;
    private final String digest;
       
		ReusableSinkGroup(
            Sink sink,
            String digest,
            SinkAbilitySpec[] sinkAbilitySpecs,
            RelTraitSet inputTraitSet) {
        this.originalSinks.add(sink);
        this.sinkAbilitySpecs = sinkAbilitySpecs;
        this.inputTraitSet = inputTraitSet;
        this.digest = digest;
    }
}


Rejected Alternatives

Introduce SupportsSinkReuse Sink Ability

Introduce a new interface SupportsSinkReuse in Connector sink abilities

@PublicEvolving
public interface SupportsSinkReuse {

	
	public boolean equals(Object sink);
}


Connectors need sink reuse feature should implement this interface. Planner will use this interface instead of the digest string to tell if two sink nodes are equal.

This is easy to implement for planner but requires the adaption of connectors to enable this feature. As far as I see, this will introduce extra trivial work which are not worthy.


Add Extra Option to Consider Target Columns in Sink Digest

For certain corner cases, users might want to consider target columns in sink nodes to tell if they are equals. An extra option like table.optimizer.reuse-sink.ignore-target-columns  with default value true can be introduced here.

However, introducing two extra options here seems a little bit tedious. For those corner cases, users can simply choose to not enable sink reuse feature.


Introduce SupportsMerge Sink Ability

Introduce a new interface SupportsMerge in Connector sink abilities. When planner try to reuse sinks with different target columns, this ability will merge the target columns from multiple sources and apply this information to connector.

However, this will introduce extra complexity to this solution without more benefits.


Compatibility, Deprecation, and Migration Plan

Connector developer should use this new interface SupportsTargetColumnWriting  to apply target column writing. The source code of API context#getTargetColumns  needs to be marked with @Deprecated in 2.1 and  removed in 2.2 or after. 


Test Plan

Add UT test in BatchSinkReuseTest and StreamSinkReuseTest

Add IT test in SinkReuseITCase