Motivation and goals

Why The Sink Upsert Materializer (SUM) is Needed, and Its Cost

The SUM is required when the upsert key (the unique identifier provided by the stream) is different from the primary key (the unique identifier in the target sink table). This happens in scenarios like multi-stage transformations, projections, or Joins.

In this situation, the SUM acts as a stateful operator to correctly handle two challenges:

  1. Key Mismatches and Conflicts: It must resolve what to do when a change record for Key_A arrives, but the sink already has a row for PK_X (where PK_X was derived from Key_A in an earlier step).

  2. Changelog Disorder: Out-of-order updates can complicate the conflict resolution.

The Problem of State: The "Rollback" Scenario

To correctly handle retractions (updates that invalidate a previous record) in a mixed-key scenario, the current SUM logic implements a "rollback" mechanism. This means it must keep a history of all updates for a given Primary Key so that if an intermediate value is retracted, it can roll back to the previous valid value.

This "rollback" behavior is the major source of the high state size problem, as it requires the Flink state to track the entire history of changes for an active Primary Key.

Example for a rollback:

CREATE TABLE inventory(
   name VARCHAR(255) PRIMARY KEY,
   quantity INT NOT NULL
);

CREATE TABLE orders(
  product_id BIGINT PRIMARY KEY,
  quantity BIGINT NOT NULL
)

CREATE TABLE products(
  id BIGINT PRIMARY KEY,
  name VARCHAR(255) NOT NULL
)

-- products
id | name
---+-----------
1  | Football
2  | Football
3  | Basketball

-- Query:
INSERT INTO inventory
SELECT name, quantity
FROM (
   SELECT product_id, SUM(quantity) as quantity
   FROM orders
   GROUP BY product_id
) o JOIN products p ON o.product_id = p.id
WHERE quantity < 10

-- records arriving from orders table
1) +U(product_id = 1, quantity=5) 
2) +U(product_id = 2, quantity=6)
3) +U(product_id = 2, quantity=7)

-- records arriving at SUM
triggered by 1) +U(name=Football, quantity=5) // we emit that +U(name=Football, quantity=5)

triggered by 2) +U(name=Football, quantity=6) // this originates from a different product, 
                                              // but has the same name, we take the latest
                                              // value +U(name=Football, quantity=6)
                                              
triggered by 3) -U(name=Football, quantity=6) // the value for product_id=2 is no longer valid,
                                              // but the quantity for product_id=1, which is also
                                              // "Football" is still "active"
                                              // we "rollback" emitting: +U(name=Football, quantity=5)
// filtered out by the condition
triggered by 3) +U(name=Football, quantity=13) 

Note: The above example is rather artificial. The user should either GROUP BY the name as well or use LAST_VALUE function instead of deduplicating in the sink.

The example is supposed to explain the behaviour. I could not come up with a real world scenario where the deduplicating logic is a must have. Happy to replace the example with a more applicable one.

The “rollback” behaviour is the major source of the big state problem of SUM. This is because we need to track all historical updates!

Unifying the Design: Performance and Correctness

The proposals in this FLIP address the poor performance and high resource consumption caused by Flink's current implicit handling of data integrity issues, specifically when the upsert key of a stream differs from the PRIMARY KEY of the sink (the use case requiring the Sink Upsert Materializer, or SUM).

These issues require a unified solution across three fronts to achieve our performance goals:

Design AreaProblem SolvedPerformance/Resource ImpactHow It Interacts
1. Changelog Disorder Fix (Compaction on Watermark)Correctness/Consistency: Ensures a stream of UB/UA events for a single key arrives in the correct, final order, preventing intermediate/flickering results.Improves Correctness and Data Integrity by delaying output until a consistent state is reached.This fix ensures that the new ON CONFLICT strategies operate on a correctly ordered set of changes.
2. State Size Reduction (New ON CONFLICT Syntax)Resource Usage: The current SUM stores the entire history of changes for "rollback" behavior, leading to massive state sizes.Dramatically reduces State Size and Memory Footprint by allowing users to choose simpler, non-rollback strategies (DO NOTHING, DO ERROR).Allows users to bypass the expensive logic required by the Changelog Disorder Fix, leading to a net performance gain.
3. Default Behavior Change (Force Explicit ON CONFLICT)User Awareness/Adoption: Users are often unaware the current implicit behavior is highly resource-intensive.Drives Widespread Performance Improvement by forcing users to acknowledge and select a more optimized conflict resolution strategy.Guarantees that users are exposed to the State Size Reduction options, ensuring the long-term adoption of optimized queries.

In summary, the Changelog Disorder Fix ensures the correctness of the operations, while the new ON CONFLICT syntax and modified default behavior provide the path to significantly reduced state size and better overall performance.

Design considerations

When thinking about improvements I took those concepts under consideration:

1. Internal Consistency

  • Objective: Avoid intermediate results for a UB/UA pair.

Example:

+I(id=1, level=10, attr='a1')
-U(id=1, level=10, attr='a1') // this record is unnecessary, because the +U would be enough
+U(id=1, level=20, attr='b1')


  • Rationale: Ensures that only finalized, consistent states are visible, preventing confusion from transient or partial updates.

"total needs to wait until it's seen all the updates that correspond to the original transaction before emitting an output.”

“stream does not distinguish between changes to the correct value and corrections of intermediate outputs”

reference: https://www.scattered-thoughts.net/writing/internal-consistency-in-streaming-systems/

2. Time-Based Consistency

  • Objective: Guarantee complete results at a specific time t (e.g., a JOIN result at time t without intermediate flickering from input tables at t-x and t-y).

  • Rationale: Users and downstream systems should see a stable, consistent snapshot at any given time.

3. Reducing State Size

  • Problem: The "rollback" scenario, where it may be necessary to revert to a value from before an incoming UB. (example)

  • Potential Solution: Consider disabling rollback behavior to simplify state management and reduce memory usage.



IN(UP)SERTs and Conflicts

How RDBMSs Handle It

Inserting into a table with a primary key (PK) where duplicates may occur (i.e., the UPSERT key ≠ PRIMARY key) is a challenging problem for traditional RDBMSs:

References:

INSERT ON CONFLICT (PostgreSQL Example)

[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    ....
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    ...

where conflict_target can be one of:

    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_name

and conflict_action is one of:

    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]


Example:

CREATE TABLE inventory(
   id INT PRIMARY KEY,
   name VARCHAR(255) NOT NULL,
   price DECIMAL(10,2) NOT NULL,
   quantity INT NOT NULL
);

-- Regular insert fails with constraint violation
INSERT INTO inventory (id, name, price, quantity)
VALUES
   	(1, 'A', 15.99, 100),
	(2, 'B', 25.49, 50),
	(3, 'C', 19.95, 75),
    (1, 'A', 16.99, 120)
    
-- output
duplicate key value violates unique constraint "your_table_pkey" (SQLSTATE 23505)

-- User wants to insert or update (if the id already exists)
INSERT INTO inventory (id, name, price, quantity)
VALUES
   	(1, 'A', 15.99, 100),
	(2, 'B', 25.49, 50),
	(3, 'C', 19.95, 75),
    (1, 'A', 16.99, 120)
ON CONFLICT(id)
DO UPDATE SET
  price = EXCLUDED.price,
  quantity = EXCLUDED.quantity;

-- The table is then
id | name | price | quantity
----+------+-------+----------
  1 | A    | 16.99 |      120
  2 | B    | 25.49 |       50
  3 | C    | 19.95 |       75

How is it different in Flink?

Actually the above problem is not so different in Flink. The difference comes from the fact that we have different changelog modes.

  • append

  • retract

  • upsert

In the cases from the previous paragraph all the INSERTs behave as if it was append mode.

In Flink

  • append is the same

  • retract is like an append, but each row can be retracted,

    • we need to deal not only with what happens if a key already exists in the target table, but also how do we retract the value

  • for an upsert stream there are two cases

    • upsert key == primary key => we don’t need to convert anything, as the underlying concepts match, we maintain the information if it was an update or an insert in the target table

    • upsert key <> primary key => this is very similar to retract, we need to deal with conflicts


What Does Sink Upsert Materializer (SUM) Do?

SUM acts like a custom conflict handler in an INSERT ON CONFLICT  clause:

INSERT ...
ON CONFLICT(...)
DO SinkUpsertMaterializer(...)
  • On New Row: Generates an UPDATE equivalent to:

    INSERT INTO inventory (id, name, price, quantity)
    SELECT ...
    ON CONFLICT(id)
    DO UPDATE SET
      id = EXCLUDED.id,
      name = EXCLUDED.name,
      price = EXCLUDED.price,
      quantity = EXCLUDED.quantity;


  • On Retraction: Rolls back to the previous value, requiring tracking of historical updates. (see the example from the motivation)



Changelog disorder

This is a self incurred problem, because of records travelling through different paths and delays of processing in each of the paths.

Example of Changelog Disorder

Given the following tables:

reference: https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness

-- CDC source tables:  s1 & s2
s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- sink table: t1
t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- join s1 and s2 and insert the result into t1 
INSERT INTO t1
SELECT 
  s1.*, s2.attr
FROM s1 JOIN s2
  ON s1.level = s2.id


An ideal flow of messages could look like:

Notes:

  • The change in s1  propagates through the pipeline to the sink.

  • Values change from First  → Then  → Finally 

  • Since s1.level  is not a primary key, a retraction (-U) will be generated by a ChangelogNormalize (excluded in diagram) as part of s1 

  • -U and +U are not related by any key and thus take independent paths.

The diagram shows how events arrive at the sink in…

ideal order (1)

+I(id=1, level=10, attr='a1')
-U(id=1, level=10, attr='a1')
+U(id=1, level=20, attr='b1')

However shuffling can also lead to:

disorder (2)

+U(id=1, level=20, attr='b1')
+I(id=1, level=10, attr='a1')
-U(id=1, level=10, attr='a1')

disorder (3)

+I(id=1, level=10, attr='a1')
+U(id=1, level=20, attr='b1')
-U(id=1, level=10, attr='a1')

Disorder properties:

  • The “retraction” (-U) always takes the same path as the previous “add” (+I), thus is always in order.

  • The “add” (+U) distracts a retraction (+I -U).

  • The “add“ (+U) might arrive before the retraction (+I -U).

What can we do…

… to fix the internal consistency 

We need a distributed barrier to know for sure that a pair of UB and UA have been processed through the whole pipeline. This barrier needs the properties:

  1. It is never generated between UB/UA pair.

  2. It never overtakes records.

A good candidate for that are watermarks or checkpoints, but only aligned (unaligned checkpoints can overtake records).

Watermarks:

Benefits:

  • more frequent

  • never overtake records

  • we can use generalized watermarks (https://cwiki.apache.org/confluence/x/oA6TEg ) and introduce a regular specialised watermarks only for the purpose of SUM

  • operators like JOINs get better point in time semantics where the output watermark marks a point in time of a result.

Disadvantages:

  • Records “belonging” to different watermarks

    • at any point in time in SUM, we may have records that need a progression of watermark to values: W[3, 5, 8, 11, … ] before they can be compacted. This is because there may be arbitrary many watermarks flowing through the pipeline at the same time. This requires careful handling and preferably ordered map state (see:  here)

    • in case of checkpoints we usually have one checkpoint pending (theoretically users can configure more pending checkpoints at the same time, but that’s almost never the case). Therefore we have records belonging to the current checkpoint or the next one.

  • idle partitions or source problems, which can lead to stalled output

Checkpoints:

Benefits:

  • more reliable

  • single version of a barrier across the entire pipeline. A checkpoint (chk-1) is injected to all the source tasks at the same time

Disadvantages:

  • does not work with unaligned checkpoints

  • usually less frequent

  • fan out operators (1-to-many joins, window operators) can stall progress similarly as idle partitions

Note: The solution is based on the fact that UB/UA are generated by the ChangelogNormalize or other intermediate nodes. If we had a source that produces UB/UA the solution may not apply. Such a source could produce corresponding UB/UA messages through different partitions or just insert watermarks in between.



…to fix state size

Support the ON CONFLICT  clause giving users the chance to implement more optimal versions of SUM that are more tailored to the particular query, e.g.

  1. If they’re confident there are no conflicts, they can e.g. NOTHING  or ERROR 

  2. They may be able to accumulate all versions in a single ROW  e.g. track a SUM  where they can add on update, subtract on retract

  3. ….


Proposed changes

Syntax extension

We should extent the INSERT INTO  syntax to support ON CONFLICT  clause:

INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    ....
    [ ON CONFLICT conflict_action ]
    ...

and conflict_action is one of:
    DO NOTHING
    DO ERROR
    DO DEDUPLICATE


Default behaviour change

If no ON CONFLICT DO ...  is provided and the upsert key is different from the primary key of the sink we would fail during the planning.

This forces users to reconsider the use case before a submission. There are three situations when the upsert key is different from primary key:

  1. An issue in the logic, e.g. missing GROUP BY

  2. The planner is not smart enough to forward a full upsert key through the entire pipeline, e.g. a JOIN using a column that is a function of a PRIMARY KEY of the source

  3. It is a valid use case where a user wants to deduplicate messages arriving for the PK

We suggest that by default we assume it is the situation described in 1). In this scenario we should fail hard and force the user to rewrite the query.

If it falls into 2) the user should use DO ERROR 

If it falls into 3) they can use one of the provided strategies depending on their use case: DO NOTHING/ERROR/DUPLICATE 

ON CONFLICT ERROR

If the DO ERROR  is chosen we will potentially throw an exception on pk constraint violation during runtime. We would expect only a single source of data for a given PK.

Therefore we would throw if:

  1. Multiple non-retracted records with the same PK arrive (valid for retract mode without upsert key)

  2. Multiple records with the same PK, but different upsert key arrive

In order to solve the disorder problem we would compact and emit only on a watermark as explained in here

ON CONFLICT DO NOTHING

Additionally we could provide the DO NOTHING  option which would be similar to the above, but instead of throwing an exception, it would take the first record that arrived and ignore other instead of throwing an exception.

Backwards compatibility and deduplicating logic

The ON CONFLICT DEDUPLICATE  would use the current logic of keeping the entire history of changes and using the latest active record.

The solution is based on the assumption that a barrier is never inserted between UA/UB pair. This may not hold for retract sources where UA/UB messages are stored separately. In the first version we would prohibit the DO ERROR/NOTHING behaviours for retract sources.



Implementation details

Assigning timestamps

The above diagram shows the need to compact records based on the watermarks that arrived on the same input as the corresponding record.

The dotted line separates two inputs to a SUM operator. At the point when watermark W=4 arrives the combined watermark of the SUM operator progresses to 3. This means we can compact the “I” record, but we should not compact the UB on the second input, nor the UB, UA pair from the first input.

Therefore we need to assign each incoming record timestamp = currentWatermarkOfInput + 1  and

compact records with timestamp <= currentCombinedWatermark .

This algorithm ensures there are no late events. The timestamps are assigned in the SUM operator, based on the incoming watermarks, so the timestamps will never lag behind the watermarks.

Pseudo code:

// key is the value of the current watermark of the input from which the record arrived
// value is the record itself
MapState<Long, RowData> buffer;

// The current value for the primary key being processed
ValueState<RowData> currentValue;

// before we emit a combined watermark, we emit a compacted record
long combinedWatermark = ....
iter = buffer.iterator
result = currentValue.get()
while (iter.next.key < combinedWatermark) {
  result = compact(result, iter.next.value)
  iter.remove
}

currentValue.update(result)
emit(result)


Compaction:

The compaction applies all buffered changes and should produce a single value at the end without emitting intermediate results.

For example for disorder (2):

+U(id=1, level=20, attr='b1')
+I(id=1, level=10, attr='a1')
-U(id=1, level=10, attr='a1')


  1. State is empty, we start with an empty list.

  2. we get +U(id=1, level=20, attr='b1'), we append the record to a temporary list: [+U(id=1, level=20, attr='b1')]

  3. we get +I(id=1, level=10, attr='a1'), we append it [+U(id=1, level=20, attr='b1'), +I(id=1, level=10, attr='a1')]

  4. we get -U(id=1, level=10, attr='a1'), we remove the corresponding record from the list [+U(id=1, level=20, attr='b1')]

  5. we processed all buffered records

    1. if the list has 1 record, we emit +U

    2. if the list has > 1 record, we throw an exception, because of PK constraint validation

    3. if the list has 0 records, we emit -D

Configuration changes

For a correct implementation of ON CONFLICT THROW/NOTHING we need watermarks flowing. There are a few options what we can do to ensure that is the case.

Option 1

We leverage existing watermarks that are defined. In case time attributes are not present in the query we would fail such queries during planning.

Option 2

As explained in fix-consistency we could use aligned checkpoints as well. Therefore we can introduce an option where a user can control if watermarks or checkpoints should be used:

ConfigOptions.key("table.exec.sink.upserts.compaction-mode")                             
.enumMode(SinkUpsertMaterializerCompactionMode.class)
                             .defaultValue(SinkUpsertMaterializerCompactionMode.WATERMARK) // CHECKPOINT or WATERMARK

Option 3

The above option could be further extended with e.g. SinkUpsertMaterializerCompactionMode.AUTO_WATERMARK which would add dedicated automatic watermarks in case of SUM with an interval controlled with e.g.

ConfigOptions.key("table.exec.sink.upserts.compaction-interval")                             .durationType()

The three options don't need to be implemented all at once. They can build incrementally on top of each other.


Future possible extensions

We would not introduce the fully customisable option ( DO SET  ) to define users' own logic for deduplication. At least not in the first version. This would require users to deal with retractions, which is a complex task.

If necessary, we can extend the strategies in the future, by introducing similar options like DO NOTHING , DO DEDUPLICATE , …

Rejected alternatives

Introduce RowId/ChangeId

Goal:

  • Attach more metadata to each row to fix the changelog disorder

  • Relate the 2 artifacts (- and +) of an update (i.e. change)

General design:

  • Every source and row producing operator attaches a Change ID to in-flight rows.

  • The Change ID allows to trace the origin of a change in (potentially a large bulk of) retraction messages.

  • Most operators such as Calc and even Join only forward the change ID.

  • The concept of a Change ID is similar to Lamport timestamps

  • The Change ID is a space efficient 15 bytes value that encodes:

    • 1 byte: header (0 = no variable length, 1 = with variable length, and future evolution)

    • 2 bytes: ID of the subtask

    • 8 bytes: event time timestamp

    • 4 bytes: uniqueness component (counter)

    • n bytes: variable length component

  • Reasoning:

    • The subtask component uniquely identifies the location.

    • The timestamp allows to relate rows with completeness barriers (i.e. watermarks)

    • The counter for avoiding duplicates without generating an expensive UUID or state access. The counter resets with every newly seen timestamp.

    • The header component enables evolution.

    • The prepared variable length part allows to attach additional information such as Kafka topic/partition/offset, Debezium Transaction Ids, or other consistency related metadata for lineage.

  • Note: The Change ID is not persisted in state except for the sink. It is a pure in-flight value.

    • We can further evolve the Change ID in the future.

    • It should be a moderate LOE to update the code base for this.

  • Watermarks allow to finalize a change (as described above)

The difference to the suggested solution

The main difference is that the timestamp is assigned at the source, where the UB/UA is created. This would let us compact some of the records on the fly instead of compacting only on watermarks.

For example:

For disorder (2)

+U(id=1, level=20, attr='b1') [t = 5]
+I(id=1, level=10, attr='a1') [t = 1]
-U(id=1, level=10, attr='a1') [t = 5]


Steps:

  • Input: +U(id=1, level=20, attr='b1') [<t = 5]

  • State is empty -> emit +I and store [t = 5]

  • Input: +I(id=1, level=10, attr='a1') [t = 1]

  • State is not empty: Compare timestamp.

    • 1 < 5 → discard +I

  • Input: -U(id=1, level=10, attr='a1') [t = 5]

  • State is not empty: Compare lineage by backtracking.

    • $5 == $5 → discard -U

Output:

+I(id=1, level=20, attr='b1')


The solution can help us to solve the changelog disorder problem, but it does not help with the internal consistency issue. If we want to fix that as well, we still need the compaction on watermarks. At the same time it increases the size of all flowing records. Therefore it was rejected in favour of simply compacting all records once on the progression of watermarks.