Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current implementation of Flink uses a ChangelogNormalize node to handle updates and deletes when processing records from sources like Kafka. The primary reason for this is that Kafka treats all records as UPDATE_AFTER, and deletes are handled as partial records where the record value is null. Generally, Flink requires both UPDATE_BEFORE and full DELETES for comprehensive state management and accurate processing outcomes.

However, there are scenarios where the requirement for UPDATE_BEFORE and full DELETES can be relaxed. For instance, when transferring records from one Kafka source to another Kafka sink with the same upsert keys, the process does not necessitate the full normalization step. In these situations, the partial DELETES are sufficient, and the overhead introduced by the ChangelogNormalize node becomes unnecessary.

Moreover, another example where the ChangelogNormalize node can be removed is when joining two upsertable tables on upsert keys. In such cases, the queries can work effectively with partial deletes. The join operation will then produce FULL_DELETES, thereby eliminating the need for the ChangelogNormalize node for state management and accurate processing outcomes.

Currently, FlinkChangelogModeInferenceProgram checks if it can work without UPDATE_BEFORE. However, it needs to be extended with information regarding partial deletes to further optimize the data processing pipeline.

Removing the ChangelogNormalize node for such queries can offer several advantages:

  1. Improved Efficiency: By skipping the normalization step where it is not needed, the data processing pipeline becomes more efficient. This can lead to reduced processing latency and lower resource consumption.

  2. Simplified Pipeline: The simplification of the execution plan by removing an extraneous node can make the pipeline easier to understand and maintain.

  3. Resource Optimization: Efficient utilization of resources can lead to cost savings, especially in environments with significant data loads and real-time processing requirements.

In summary, this proposal aims to optimize specific data processing scenarios in Flink by eliminating the redundant ChangelogNormalize step, thus enhancing overall performance and resource efficiency.

Public Interfaces

@PublicEvolving
public final class ChangelogMode {
...
    private final Set<RowKind> kinds;
    private final boolean keyOnlyDeletes;
    
    public boolean keyOnlyDeletes() {
        return keyOnlyDeletes;
    }
    
    /** Builder for configuring and creating instances of {@link ChangelogMode}. */
    @PublicEvolving
    public static class Builder {

        ....

        public Builder keyOnlyDeletes(boolean flag) {
            this.deletesByKeyOnly = flag;
            return this;
        }

        public ChangelogMode build() {
            return new ChangelogMode(kinds, keyOnlyDeletes);
        }
    }
}

Proposed Changes

We extend the ChangelogMode with an information if the DELETEs are represented/can be deletes by key or do they require/produce full record information. We need to update kafka connector to provide that information properly. Upsert Kafka source is the only known source so far that produces deletes by key.

Additionally we extend FlinkChangelogModeInferenceProgramto eliminate ChangelogNormalize if the pipeline can work without UPDATE_BEFORE and DELETE with key information only. Similar work has been done for Managed tables in the past: https://issues.apache.org/jira/browse/FLINK-25183

Compatibility, Deprecation, and Migration Plan

Only newly compiled jobs will produce a more optimal plan.

Test Plan

Plan tests for eliminating ChangelogNormalize

IT tests for the optimized cases:

  1. Pipe from upsert source into upsert sink

  2. Join in a pipeline

Rejected Alternatives

  1. Add the information if a sink supports DELETE by key to the DynamicTableSink interface:

@PublicEvolving
public interface DynamicTableSink {
    default boolean requiresFullDeletes() {
        return true;
    }
}

@PublicEvolving public interface DynamicTableSource { default boolean producesFullDeletes() { return true; } }

The benefit of adding it to the ChangelogMode is that way we can provide the same information e.g. in

  • Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

  • DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);