DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation and goals
Why The Sink Upsert Materializer (SUM) is Needed, and Its Cost
The SUM is required when the upsert key (the unique identifier provided by the stream) is different from the primary key (the unique identifier in the target sink table). This happens in scenarios like multi-stage transformations, projections, or Joins.
In this situation, the SUM acts as a stateful operator to correctly handle two challenges:
Key Mismatches and Conflicts: It must resolve what to do when a change record for
Key_Aarrives, but the sink already has a row forPK_X(wherePK_Xwas derived fromKey_Ain an earlier step).Changelog Disorder: Out-of-order updates can complicate the conflict resolution.
The Problem of State: The "Rollback" Scenario
To correctly handle retractions (updates that invalidate a previous record) in a mixed-key scenario, the current SUM logic implements a "rollback" mechanism. This means it must keep a history of all updates for a given Primary Key so that if an intermediate value is retracted, it can roll back to the previous valid value.
This "rollback" behavior is the major source of the high state size problem, as it requires the Flink state to track the entire history of changes for an active Primary Key.
Example for a rollback:
CREATE TABLE inventory(
name VARCHAR(255) PRIMARY KEY,
quantity INT NOT NULL
);
CREATE TABLE orders(
product_id BIGINT PRIMARY KEY,
quantity BIGINT NOT NULL
)
CREATE TABLE products(
id BIGINT PRIMARY KEY,
name VARCHAR(255) NOT NULL
)
-- products
id | name
---+-----------
1 | Football
2 | Football
3 | Basketball
-- Query:
INSERT INTO inventory
SELECT name, quantity
FROM (
SELECT product_id, SUM(quantity) as quantity
FROM orders
GROUP BY product_id
) o JOIN products p ON o.product_id = p.id
WHERE quantity < 10
-- records arriving from orders table
1) +U(product_id = 1, quantity=5)
2) +U(product_id = 2, quantity=6)
3) +U(product_id = 2, quantity=7)
-- records arriving at SUM
triggered by 1) +U(name=Football, quantity=5) // we emit that +U(name=Football, quantity=5)
triggered by 2) +U(name=Football, quantity=6) // this originates from a different product,
// but has the same name, we take the latest
// value +U(name=Football, quantity=6)
triggered by 3) -U(name=Football, quantity=6) // the value for product_id=2 is no longer valid,
// but the quantity for product_id=1, which is also
// "Football" is still "active"
// we "rollback" emitting: +U(name=Football, quantity=5)
// filtered out by the condition
triggered by 3) +U(name=Football, quantity=13)
Note: The above example is rather artificial. The user should either GROUP BY the name as well or use LAST_VALUE function instead of deduplicating in the sink.
The example is supposed to explain the behaviour. I could not come up with a real world scenario where the deduplicating logic is a must have. Happy to replace the example with a more applicable one.
The “rollback” behaviour is the major source of the big state problem of SUM. This is because we need to track all historical updates!
Unifying the Design: Performance and Correctness
The proposals in this FLIP address the poor performance and high resource consumption caused by Flink's current implicit handling of data integrity issues, specifically when the upsert key of a stream differs from the PRIMARY KEY of the sink (the use case requiring the Sink Upsert Materializer, or SUM).
These issues require a unified solution across three fronts to achieve our performance goals:
| Design Area | Problem Solved | Performance/Resource Impact | How It Interacts |
| 1. Changelog Disorder Fix (Compaction on Watermark) | Correctness/Consistency: Ensures a stream of UB/UA events for a single key arrives in the correct, final order, preventing intermediate/flickering results. | Improves Correctness and Data Integrity by delaying output until a consistent state is reached. | This fix ensures that the new ON CONFLICT strategies operate on a correctly ordered set of changes. |
2. State Size Reduction (New ON CONFLICT Syntax) | Resource Usage: The current SUM stores the entire history of changes for "rollback" behavior, leading to massive state sizes. | Dramatically reduces State Size and Memory Footprint by allowing users to choose simpler, non-rollback strategies (DO NOTHING, DO ERROR). | Allows users to bypass the expensive logic required by the Changelog Disorder Fix, leading to a net performance gain. |
3. Default Behavior Change (Force Explicit ON CONFLICT) | User Awareness/Adoption: Users are often unaware the current implicit behavior is highly resource-intensive. | Drives Widespread Performance Improvement by forcing users to acknowledge and select a more optimized conflict resolution strategy. | Guarantees that users are exposed to the State Size Reduction options, ensuring the long-term adoption of optimized queries. |
In summary, the Changelog Disorder Fix ensures the correctness of the operations, while the new ON CONFLICT syntax and modified default behavior provide the path to significantly reduced state size and better overall performance.
Design considerations
When thinking about improvements I took those concepts under consideration:
1. Internal Consistency
Objective: Avoid intermediate results for a UB/UA pair.
Example:
+I(id=1, level=10, attr='a1') -U(id=1, level=10, attr='a1') // this record is unnecessary, because the +U would be enough +U(id=1, level=20, attr='b1')
Rationale: Ensures that only finalized, consistent states are visible, preventing confusion from transient or partial updates.
"total needs to wait until it's seen all the updates that correspond to the original transaction before emitting an output.”
“stream does not distinguish between changes to the correct value and corrections of intermediate outputs”
reference: https://www.scattered-thoughts.net/writing/internal-consistency-in-streaming-systems/
2. Time-Based Consistency
Objective: Guarantee complete results at a specific time t (e.g., a JOIN result at time t without intermediate flickering from input tables at t-x and t-y).
Rationale: Users and downstream systems should see a stable, consistent snapshot at any given time.
3. Reducing State Size
Problem: The "rollback" scenario, where it may be necessary to revert to a value from before an incoming UB. (example)
Potential Solution: Consider disabling rollback behavior to simplify state management and reduce memory usage.
IN(UP)SERTs and Conflicts
How RDBMSs Handle It
Inserting into a table with a primary key (PK) where duplicates may occur (i.e., the UPSERT key ≠ PRIMARY key) is a challenging problem for traditional RDBMSs:
References:
SQL Server MERGE Statement (MERGE)
INSERT ON CONFLICT (PostgreSQL Example)
[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
....
[ ON CONFLICT [ conflict_target ] conflict_action ]
...
where conflict_target can be one of:
( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
ON CONSTRAINT constraint_name
and conflict_action is one of:
DO NOTHING
DO UPDATE SET { column_name = { expression | DEFAULT } |
( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
( column_name [, ...] ) = ( sub-SELECT )
} [, ...]
[ WHERE condition ]
Example:
CREATE TABLE inventory(
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL
);
-- Regular insert fails with constraint violation
INSERT INTO inventory (id, name, price, quantity)
VALUES
(1, 'A', 15.99, 100),
(2, 'B', 25.49, 50),
(3, 'C', 19.95, 75),
(1, 'A', 16.99, 120)
-- output
duplicate key value violates unique constraint "your_table_pkey" (SQLSTATE 23505)
-- User wants to insert or update (if the id already exists)
INSERT INTO inventory (id, name, price, quantity)
VALUES
(1, 'A', 15.99, 100),
(2, 'B', 25.49, 50),
(3, 'C', 19.95, 75),
(1, 'A', 16.99, 120)
ON CONFLICT(id)
DO UPDATE SET
price = EXCLUDED.price,
quantity = EXCLUDED.quantity;
-- The table is then
id | name | price | quantity
----+------+-------+----------
1 | A | 16.99 | 120
2 | B | 25.49 | 50
3 | C | 19.95 | 75
How is it different in Flink?
Actually the above problem is not so different in Flink. The difference comes from the fact that we have different changelog modes.
append
retract
upsert
In the cases from the previous paragraph all the INSERTs behave as if it was append mode.
In Flink
append is the same
retract is like an append, but each row can be retracted,
we need to deal not only with what happens if a key already exists in the target table, but also how do we retract the value
for an upsert stream there are two cases
upsert key == primary key => we don’t need to convert anything, as the underlying concepts match, we maintain the information if it was an update or an insert in the target table
upsert key <> primary key => this is very similar to retract, we need to deal with conflicts
What Does Sink Upsert Materializer (SUM) Do?
SUM acts like a custom conflict handler in an INSERT ON CONFLICT clause:
INSERT ... ON CONFLICT(...) DO SinkUpsertMaterializer(...)
On New Row: Generates an UPDATE equivalent to:
INSERT INTO inventory (id, name, price, quantity) SELECT ... ON CONFLICT(id) DO UPDATE SET id = EXCLUDED.id, name = EXCLUDED.name, price = EXCLUDED.price, quantity = EXCLUDED.quantity;
On Retraction: Rolls back to the previous value, requiring tracking of historical updates. (see the example from the motivation)
Changelog disorder
This is a self incurred problem, because of records travelling through different paths and delays of processing in each of the paths.
Example of Changelog Disorder
Given the following tables:
-- CDC source tables: s1 & s2 s1: id BIGINT, level BIGINT, PRIMARY KEY(id) s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id) -- sink table: t1 t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id) -- join s1 and s2 and insert the result into t1 INSERT INTO t1 SELECT s1.*, s2.attr FROM s1 JOIN s2 ON s1.level = s2.id
An ideal flow of messages could look like:
Notes:
The change in
s1propagates through the pipeline to the sink.Values change from
First→Then→FinallySince
s1.levelis not a primary key, a retraction (-U) will be generated by aChangelogNormalize(excluded in diagram) as part ofs1-U and +U are not related by any key and thus take independent paths.
The diagram shows how events arrive at the sink in…
ideal order (1)
+I(id=1, level=10, attr='a1') -U(id=1, level=10, attr='a1') +U(id=1, level=20, attr='b1')
However shuffling can also lead to:
disorder (2)
+U(id=1, level=20, attr='b1') +I(id=1, level=10, attr='a1') -U(id=1, level=10, attr='a1')
disorder (3)
+I(id=1, level=10, attr='a1') +U(id=1, level=20, attr='b1') -U(id=1, level=10, attr='a1')
Disorder properties:
The “retraction” (-U) always takes the same path as the previous “add” (+I), thus is always in order.
The “add” (+U) distracts a retraction (+I -U).
The “add“ (+U) might arrive before the retraction (+I -U).
What can we do…
… to fix the internal consistency
We need a distributed barrier to know for sure that a pair of UB and UA have been processed through the whole pipeline. This barrier needs the properties:
It is never generated between UB/UA pair.
It never overtakes records.
A good candidate for that are watermarks or checkpoints, but only aligned (unaligned checkpoints can overtake records).
Watermarks:
Benefits:
more frequent
never overtake records
we can use generalized watermarks (https://cwiki.apache.org/confluence/x/oA6TEg ) and introduce a regular specialised watermarks only for the purpose of SUM
operators like JOINs get better point in time semantics where the output watermark marks a point in time of a result.
Disadvantages:
Records “belonging” to different watermarks
at any point in time in SUM, we may have records that need a progression of watermark to values: W[3, 5, 8, 11, … ] before they can be compacted. This is because there may be arbitrary many watermarks flowing through the pipeline at the same time. This requires careful handling and preferably ordered map state (see: here)
in case of checkpoints we usually have one checkpoint pending (theoretically users can configure more pending checkpoints at the same time, but that’s almost never the case). Therefore we have records belonging to the current checkpoint or the next one.
idle partitions or source problems, which can lead to stalled output
Checkpoints:
Benefits:
more reliable
single version of a barrier across the entire pipeline. A checkpoint (chk-1) is injected to all the source tasks at the same time
Disadvantages:
does not work with unaligned checkpoints
usually less frequent
fan out operators (1-to-many joins, window operators) can stall progress similarly as idle partitions
Note: The solution is based on the fact that UB/UA are generated by the ChangelogNormalize or other intermediate nodes. If we had a source that produces UB/UA the solution may not apply. Such a source could produce corresponding UB/UA messages through different partitions or just insert watermarks in between.
…to fix state size
Support the ON CONFLICT clause giving users the chance to implement more optimal versions of SUM that are more tailored to the particular query, e.g.
If they’re confident there are no conflicts, they can e.g.
NOTHINGorERRORThey may be able to accumulate all versions in a single
ROWe.g. track aSUMwhere they can add on update, subtract on retract….
Proposed changes
Syntax extension
We should extent the INSERT INTO syntax to support ON CONFLICT clause:
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
....
[ ON CONFLICT conflict_action ]
...
and conflict_action is one of:
DO NOTHING
DO ERROR
DO DEDUPLICATE
Default behaviour change
If no ON CONFLICT DO ... is provided and the upsert key is different from the primary key of the sink we would fail during the planning.
This forces users to reconsider the use case before a submission. There are three situations when the upsert key is different from primary key:
An issue in the logic, e.g. missing GROUP BY
The planner is not smart enough to forward a full upsert key through the entire pipeline, e.g. a JOIN using a column that is a function of a PRIMARY KEY of the source
It is a valid use case where a user wants to deduplicate messages arriving for the PK
We suggest that by default we assume it is the situation described in 1). In this scenario we should fail hard and force the user to rewrite the query.
If it falls into 2) the user should use DO ERROR
If it falls into 3) they can use one of the provided strategies depending on their use case: DO NOTHING/ERROR/DUPLICATE
ON CONFLICT ERROR
If the DO ERROR is chosen we will potentially throw an exception on pk constraint violation during runtime. We would expect only a single source of data for a given PK.
Therefore we would throw if:
Multiple non-retracted records with the same PK arrive (valid for retract mode without upsert key)
Multiple records with the same PK, but different upsert key arrive
In order to solve the disorder problem we would compact and emit only on a watermark as explained in here
ON CONFLICT DO NOTHING
Additionally we could provide the DO NOTHING option which would be similar to the above, but instead of throwing an exception, it would take the first record that arrived and ignore other instead of throwing an exception.
Backwards compatibility and deduplicating logic
The ON CONFLICT DEDUPLICATE would use the current logic of keeping the entire history of changes and using the latest active record.
The solution is based on the assumption that a barrier is never inserted between UA/UB pair. This may not hold for retract sources where UA/UB messages are stored separately. In the first version we would prohibit the DO ERROR/NOTHING behaviours for retract sources.
Implementation details
Assigning timestamps
The above diagram shows the need to compact records based on the watermarks that arrived on the same input as the corresponding record.
The dotted line separates two inputs to a SUM operator. At the point when watermark W=4 arrives the combined watermark of the SUM operator progresses to 3. This means we can compact the “I” record, but we should not compact the UB on the second input, nor the UB, UA pair from the first input.
Therefore we need to assign each incoming record timestamp = currentWatermarkOfInput + 1 and
compact records with timestamp <= currentCombinedWatermark .
This algorithm ensures there are no late events. The timestamps are assigned in the SUM operator, based on the incoming watermarks, so the timestamps will never lag behind the watermarks.
Pseudo code:
// key is the value of the current watermark of the input from which the record arrived
// value is the record itself
MapState<Long, RowData> buffer;
// The current value for the primary key being processed
ValueState<RowData> currentValue;
// before we emit a combined watermark, we emit a compacted record
long combinedWatermark = ....
iter = buffer.iterator
result = currentValue.get()
while (iter.next.key < combinedWatermark) {
result = compact(result, iter.next.value)
iter.remove
}
currentValue.update(result)
emit(result)
Compaction:
The compaction applies all buffered changes and should produce a single value at the end without emitting intermediate results.
For example for disorder (2):
+U(id=1, level=20, attr='b1') +I(id=1, level=10, attr='a1') -U(id=1, level=10, attr='a1')
State is empty, we start with an empty list.
we get +U(id=1, level=20, attr='b1'), we append the record to a temporary list: [+U(id=1, level=20, attr='b1')]
we get +I(id=1, level=10, attr='a1'), we append it [+U(id=1, level=20, attr='b1'), +I(id=1, level=10, attr='a1')]
we get -U(id=1, level=10, attr='a1'), we remove the corresponding record from the list [+U(id=1, level=20, attr='b1')]
we processed all buffered records
if the list has 1 record, we emit +U
if the list has > 1 record, we throw an exception, because of PK constraint validation
if the list has 0 records, we emit -D
Configuration changes
For a correct implementation of ON CONFLICT THROW/NOTHING we need watermarks flowing. There are a few options what we can do to ensure that is the case.
Option 1
We leverage existing watermarks that are defined. In case time attributes are not present in the query we would fail such queries during planning.
Option 2
As explained in fix-consistency we could use aligned checkpoints as well. Therefore we can introduce an option where a user can control if watermarks or checkpoints should be used:
ConfigOptions.key("table.exec.sink.upserts.compaction-mode")
.enumMode(SinkUpsertMaterializerCompactionMode.class)
.defaultValue(SinkUpsertMaterializerCompactionMode.WATERMARK) // CHECKPOINT or WATERMARK
Option 3
The above option could be further extended with e.g. SinkUpsertMaterializerCompactionMode.AUTO_WATERMARK which would add dedicated automatic watermarks in case of SUM with an interval controlled with e.g.
ConfigOptions.key("table.exec.sink.upserts.compaction-interval") .durationType()
The three options don't need to be implemented all at once. They can build incrementally on top of each other.
Future possible extensions
We would not introduce the fully customisable option ( DO SET ) to define users' own logic for deduplication. At least not in the first version. This would require users to deal with retractions, which is a complex task.
If necessary, we can extend the strategies in the future, by introducing similar options like DO NOTHING , DO DEDUPLICATE , …
Rejected alternatives
Introduce RowId/ChangeId
Goal:
Attach more metadata to each row to fix the changelog disorder
Relate the 2 artifacts (- and +) of an update (i.e. change)
General design:
Every source and row producing operator attaches a Change ID to in-flight rows.
The Change ID allows to trace the origin of a change in (potentially a large bulk of) retraction messages.
Most operators such as Calc and even Join only forward the change ID.
The concept of a Change ID is similar to Lamport timestamps
The Change ID is a space efficient 15 bytes value that encodes:
1 byte: header (0 = no variable length, 1 = with variable length, and future evolution)
2 bytes: ID of the subtask
8 bytes: event time timestamp
4 bytes: uniqueness component (counter)
n bytes: variable length component
Reasoning:
The subtask component uniquely identifies the location.
The timestamp allows to relate rows with completeness barriers (i.e. watermarks)
The counter for avoiding duplicates without generating an expensive UUID or state access. The counter resets with every newly seen timestamp.
The header component enables evolution.
The prepared variable length part allows to attach additional information such as Kafka topic/partition/offset, Debezium Transaction Ids, or other consistency related metadata for lineage.
Note: The Change ID is not persisted in state except for the sink. It is a pure in-flight value.
We can further evolve the Change ID in the future.
It should be a moderate LOE to update the code base for this.
Watermarks allow to finalize a change (as described above)
The difference to the suggested solution
The main difference is that the timestamp is assigned at the source, where the UB/UA is created. This would let us compact some of the records on the fly instead of compacting only on watermarks.
For example:
For disorder (2)
+U(id=1, level=20, attr='b1') [t = 5] +I(id=1, level=10, attr='a1') [t = 1] -U(id=1, level=10, attr='a1') [t = 5]
Steps:
Input: +U(id=1, level=20, attr='b1') [<t = 5]
State is empty -> emit +I and store [t = 5]
Input: +I(id=1, level=10, attr='a1') [t = 1]
State is not empty: Compare timestamp.
1 < 5 → discard +I
Input: -U(id=1, level=10, attr='a1') [t = 5]
State is not empty: Compare lineage by backtracking.
$5 == $5 → discard -U
Output:
+I(id=1, level=20, attr='b1')
The solution can help us to solve the changelog disorder problem, but it does not help with the internal consistency issue. If we want to fix that as well, we still need the compaction on watermarks. At the same time it increases the size of all flowing records. Therefore it was rejected in favour of simply compacting all records once on the progression of watermarks.

