DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
When users are trying to partial-update a downstream table from multiple source tables, usually they will write multiple `insert into ...` in sql. This will generate multiple datastreams in the running job and for downstream storage like datalakes, there will be concurrency issue for multiple writers ingesting data into one table at the same time. Users in this case may have to manually union all source tables in Flink SQL before sink to downstream table. The SQL here is hard to use and maintain. See the following example for details:
-- Source Input
source table1:id1(primary key),f1,f2,f3,null,null,null,null,null,null,... ...
source table2:id1(primary key),null,null,null,f4,f5,f6,null,null,null,... ...
source table3:id1(primary key),null,null,null,null,null,null,f7,f8,f9,... ...
... ...
source table5:id1(primary key),... ...
-- Sink Output
sink table:id1,f1,f2,f3,f4,f5,f6,f7,f8,f9,... ...
-- Flink SQL
INSERT INTO sink
SELECT id,
f1,
f2,
f3,
f4,
f5,
f6,
f7,
f8,
f9,
... ...
FROM (
SELECT id,
f1,
f2,
f3,
CAST(NULL as STRING) as f4,
CAST(NULL as STRING) as f5,
CAST(NULL as STRING) as f6,
CAST(NULL as STRING) as f7,
CAST(NULL as STRING) as f8,
CAST(NULL as STRING) as f9,
... ...
FROM table1
UNION ALL
SELECT id,
CAST(NULL as STRING) as f1,
CAST(NULL as STRING) as f2,
CAST(NULL as STRING) as f3,
f4,
f5,
f6,
CAST(NULL as STRING) as f7,
CAST(NULL as STRING) as f8,
CAST(NULL as STRING) as f9,
... ...
FROM table2
UNION ALL
SELECT id,
CAST(NULL as STRING) as f1,
CAST(NULL as STRING) as f2,
CAST(NULL as STRING) as f3,
CAST(NULL as STRING) as f4,
CAST(NULL as STRING) as f5,
CAST(NULL as STRING) as f6,
f7,
f8,
f9,
... ...
FROM table3
UNION ALL
SELECT
... ...
FROM table4
UNION ALL
SELECT
... ...
FROM table5
)
When sink table has wide columns(>100) and are updating from more than 10 source tables, this maintenance of this SQL is almost unacceptable.It would be better if the flink planer can reuse the sink nodes across multiple table sinks in this case. This would be a great usability improvement for users using partial-update features with datalake storages like Paimon.
Examples of Use
Users are partial-updating a downstream table using Flink SQL like this:
BEGIN STATEMENT SET; INSERT INTO sink(pk, f1, f2, f3) SELECT pk, f1, f2, f3 FROM table1; INSERT INTO sink(pk, f4, f5, f6) SELECT pk, f4, f5, f6 FROM table2; INSERT INTO sink(pk, f7, f8, f9) SELECT pk, f7, f8, f9 FROM table3; ... ... INSERT INTO sink(pk, ... ...) SELECT pk, ... ... FROM table5; END;
Before Sink Reuse
Multiple datastreams will write to the sink table concurrently. This can lead to performance fallbacks and consistency issues for downstream storages like datalakes (Paimon, Hudi, etc).
(using Paimon Sink as an example)
After Sink Reuse
A united datastream will write to the sink table. No concerns for concurrency issues.
(using Paimon Sink as an example)
Public Interfaces
OptimizerConfigOptions
Option Name | Default Value | Description |
table.optimizer.reuse-sink-enabled | true | When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when |
Migrate TargetColumns From DynamicTableSink.Context to Connector abilities
This FLIP relies on sink to tell the planner if the target columns has been used in the sink. If this information has been applied to the sink, the planner should not reuse the sinks with different target columns. Currently, the target columns are being placed in the context of DynamicTableSink and are offered to sink by one side. Planner does not aware if the sink uses this information or not. In order to make the planner aware of the usage of target column writing, the target column writing should be migrated from sink context to connector abilities.
Deprecate getTagetColumns() in DynamicTableSink.Context
@PublicEvolving
interface Context {
... ...
/**
*
* @deprecated use {@link
* org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting} instead.
*/
@Deprecated(since = "2.1")
Optional<int[][]> getTargetColumns();
}
Introduce new sink ability SupportsTargetColumnWriting
SupportsTargetColumnWriting:Interface for DynamicTableSinks that support target column writing.
package org.apache.flink.table.connector.sink.abilities;
/**
* Interface for {@link DynamicTableSink}s that support target column writing.
*
* <p>The planner will parse target columns from the DML clause and call {@link
* #applyTargetColumns(int[][])} to pass an array of column index paths to sink.
*
* <p>The array indices are 0-based and support composite columns within (possibly nested)
* structures. This information comes from the column list of the DML clause, e.g., for a sink table
* t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
*
* <ul>
* <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will provide
* {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' will not apply this
* ability.
* <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will be 'a, b.b1', and will
* provide {@code [[0], [1, 0]]}.
* </ul>
*
* <p>Note: Planner will not apply this ability for the delete statement because it has no column
* list.
*
* <p>A sink can use this information to perform target columns writing.
*
* <p>If this interface is implemented and {@link #applyTargetColumns(int[][])} returns true. The
* planner will use this information for plan optimization such as sink reuse.
*/
@PublicEvolving
public interface SupportsTargetColumnWriting {
/**
* Provides an array of column index paths related to user specified target column list.
*
* <p>See the documentation of {@link SupportsTargetColumnWriting} for more information.
*
* @param targetColumns column index paths
* @return true if the target columns are applied successfully, false otherwise.
*/
boolean applyTargetColumns(int[][] targetColumns);
}
TargetColumnWritingSpec: A sub-class of SinkAbilitySpec that supports TargetColumnWriting
package org.apache.flink.table.planner.plan.abilities.sink;
/**
* A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the writing target
* column indices to/from JSON, but also can write the target columns for {@link
* SupportsTargetColumnWriting}.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("TargetColumnWriting")
public class TargetColumnWritingSpec implements SinkAbilitySpec {
public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";
@JsonProperty(FIELD_NAME_TARGET_COLUMNS)
private final int[][] targetColumns;
@JsonCreator
public TargetColumnWritingSpec(@JsonProperty(FIELD_NAME_TARGET_COLUMNS) int[][] targetColumns) {
this.targetColumns = targetColumns;
}
@Override
public void apply(DynamicTableSink tableSink) {
if (tableSink instanceof SupportsTargetColumnWriting) {
((SupportsTargetColumnWriting) tableSink).applyTargetColumns(targetColumns);
} else {
throw new TableException(
String.format(
"%s does not support SupportsTargetColumnWriting.",
tableSink.getClass().getName()));
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TargetColumnWritingSpec that = (TargetColumnWritingSpec) o;
return Objects.deepEquals(targetColumns, that.targetColumns);
}
@Override
public int hashCode() {
return Arrays.deepHashCode(targetColumns);
}
}
Proposed Changes
Overview
First, parse the targetColumn information from SinkModifyOperation to the ability specs of sink relnodes
Second, introduce a new SinkReuser in SubplanReuser to reuse duplicated sink relnodes with following behaviors:
- Iterate over all relNodes and generate digest for each sink node
- Group reusable sink nodes as a
ReusableSinkGroupif two sink nodes have the same digest, sinkAbilitySpecs and inputTraitSet - Create a union node by the inputs of sink nodes in a
ReusableSinkGroupand place this union node in front of sink node - Remove all unused sink nodes and left only the reused sink in a
ReusableSinkGroup - Return a new list of relNodes
Sink Digest
The key of sink reuse is to judge if different sink nodes are equals. This can be achieved by extract digest information from the sink node.
Factors considered for sink node digest:
- sink table identifier
- sink table schemas
- sink table hints
- sink table upsertMaterialize option for StreamingMode
- sink target columns
- target columns will be removed from the digest after the getTargetColumns api is removed from the context
ReusableSinkGroup
Sinks that are reusable should be grouped into a ReusableSinkGroup. Sinks in a ReusableSinkGroup share the same digest, sinkAbilitySpecs and inputTraitSet.
public class ReusableSinkGroup {
private final List<Sink> originalSinks = new ArrayList<>();
private final SinkAbilitySpec[] sinkAbilitySpecs;
private final RelTraitSet inputTraitSet;
private final String digest;
ReusableSinkGroup(
Sink sink,
String digest,
SinkAbilitySpec[] sinkAbilitySpecs,
RelTraitSet inputTraitSet) {
this.originalSinks.add(sink);
this.sinkAbilitySpecs = sinkAbilitySpecs;
this.inputTraitSet = inputTraitSet;
this.digest = digest;
}
}
Rejected Alternatives
Introduce SupportsSinkReuse Sink Ability
Introduce a new interface SupportsSinkReuse in Connector sink abilities
@PublicEvolving
public interface SupportsSinkReuse {
public boolean equals(Object sink);
}
Connectors need sink reuse feature should implement this interface. Planner will use this interface instead of the digest string to tell if two sink nodes are equal.
This is easy to implement for planner but requires the adaption of connectors to enable this feature. As far as I see, this will introduce extra trivial work which are not worthy.
Add Extra Option to Consider Target Columns in Sink Digest
For certain corner cases, users might want to consider target columns in sink nodes to tell if they are equals. An extra option like table.optimizer.reuse-sink.ignore-target-columns with default value true can be introduced here.
However, introducing two extra options here seems a little bit tedious. For those corner cases, users can simply choose to not enable sink reuse feature.
Introduce SupportsMerge Sink Ability
Introduce a new interface SupportsMerge in Connector sink abilities. When planner try to reuse sinks with different target columns, this ability will merge the target columns from multiple sources and apply this information to connector.
However, this will introduce extra complexity to this solution without more benefits.
Compatibility, Deprecation, and Migration Plan
Connector developer should use this new interface SupportsTargetColumnWriting to apply target column writing. The source code of API context#getTargetColumns needs to be marked with @Deprecated in 2.1 and removed in 2.2 or after.
Test Plan
Add UT test in BatchSinkReuseTest and StreamSinkReuseTest
Add IT test in SinkReuseITCase

