DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Changelog Mode
In streaming SQL, there are generally four kinds of messages: INSERT (+I), UPDATE_BEFORE (-U), UPDATE_AFTER (+U), DELETE (-D). When a record is being inserted and then updated, this can be encoded as [ +I, -U, +U ]. In Flink, such messages might flow through the pipeline without a strict order even though they originate from the same event. However, downstream systems expect these messages strictly in order; otherwise, primary key violation or data loss might occur.
SinkUpsertMaterializer is the operator that reconciles these messages to ensure the order before sending them to the downstream system.
Please see [1] for more information on the topic and why this reconciliation is necessary. Highly recommended for a good understanding.
Current implementation
To achieve that, SinkUpsertMaterializer maintains a “history” of all the updates for the given key - represented as java.util.List in code. New records are usually added to the end of the list; retractions might remove an element at any position (this is a simplification; please refer to the above article for more details). The List is stored as a ValueState.
This approach has several significant drawbacks:
On every update, need to read, (de)serialize, and write back the whole list
To find the record to retract (or to update in case of the upsert key), the whole list has to be traversed in
O(n)timeThe whole list has to be loaded in memory, which increases the memory pressure and creates the risk of OutOfMemory errors
As a result, in some of our production use cases, performance drops to 1-5 records per second. Such a low throughput, besides all, often results in hard back-pressure and checkpointing problems (even with Unaligned Checkpoints enabled).
That all renders the current implementation unusable and requires rewriting some queries (which is possible sometimes, but not always).
Solutions outline
The problem can be solved by:
A more efficient implementation (which we introduce below)
Cleaning up the state/history more aggressively (e.g. on watermark)
Currently, we’re investigating significant changes to the SQL planner that would allow to avoid or largely change the semantics of SinkUpsertMaterializer
Disallowing
SinkUpsertMaterializeraltogether and requiring users to rewrite their queriesProbably, better support in the downstream systems
The preferred solution depends on the use case and the timeline.
We presume that for the most of Flink users (including ourselves), a more efficient implementation (1) works best - at least in the short-/mid-term.
This FLIP serves to synchronize the efforts in different directions and to document the design for (1).
Retraction-aware deduplication and similar use cases
Apart from that, there are some other cases when a similar functionality could be useful:
Existing
FirstValueWithRetractAggFunction,LastValueWithRetractAggFunctionPotentially, retraction-aware lag and lead functions
Potentially, efficient retraction-aware deduplication operator
The last one could be especially useful. In fact, SinkUpsertMaterializer already deduplicates retraction stream.
All of these cases could use the same data structure that is proposed in this FLIP.
Public Interfaces
New configuration option:
# The strategy to use in SinkUpsertMaterializer. # Supported values: # - LEGACY - simple implementation based on ValueState<List> (the original implementation) # -- optimal for cases with history under approx. 100 elements # -- limited TTL support (per key granularity, i.e. no expiration for old history elements) # - MAP - OrderedMultiSet-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties # -- faster and more memory-efficient on long histories # -- currently, no TTL support (to be added in the future) # -- slower on short histories # -- requires more space # - VALUE - similar to LEGACY, but compatible with MAP # -- allows to switch to ADAPTIVE # - ADAPTIVE # -- alternate between MAP and VALUE depending on the number of entries for the given key # -- starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low) # The default is LEGACY # The option takes effect during planning / compile plan generation. # Existing jobs won't be affected by this option table.exec.sink.upsert-materialize.strategy: [ LEGACY | MAP | VALUE | ADAPTIVE ] # When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP # The option takes effect during job (re)starting table.exec.sink.upsert-materialize.adaptive.threshold.high: # When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE # The option takes effect during job (re)starting table.exec.sink.upsert-materialize.adaptive.threshold.low:
The threshold options above are @Experimental. When not specified, the default values will be used, which depend on the state backend used:
| Parameter | RocksDB | HashMap |
|---|---|---|
table.exec.sink.upsert-materialize.adaptive.threshold.high | 50 | 400 |
table.exec.sink.upsert-materialize.adaptive.threshold.low | 40 | 300 |
To implement the logic of choosing the value based on the state backend, the a new method getBackendTypeIdentifier will be added to KeyedStateStore:
// flink-core
package org.apache.flink.api.common.state
public interface KeyedStateStore {
@Experimental
getBackendTypeIdentifier();
}
(the exact return type is an implementation detail).
Proposed Changes
Similar to the original implementation, the new one captures the history of the changes for a record (and not the point-in-time view).
However, instead of a ValueState<List>, it emulates Ordered(Multi)Set using the existing Flink state primitives:
the order is maintained by attaching a
SequenceNumber(long) to every element“linking” to the next and previous elements is done using
SequenceNumbersfast lookup by the
SequenceNumberis achieved usingMapState<SequenceNumber, Node>fast lookup by
RowDatais achieved usingMapState<RowData, SequenceNumber>the latest
SequenceNumberis kept in aValueState
Note that due to retractions, there might be “holes” in the list; however, that doesn’t affect correctness as long as the neighboring nodes are relinked together.
In upsert-key case, the record is added to the existing position in the list (if it exists; otherwise, to the end).
Complexity analysis
Space
O(C * n), where
nis the number of records (“history size”)C= 2 since we have two maps, both holding incoming records
Time
O(1), with 7 reads and 3 writes in the worst cast (2 reads can be eliminated)
Switching the algorithm dynamically
The implementation based on ValueState might work better on short lists; for RocksDB, this is under 50
To minimize the regression in such cases, ADAPTIVE implementation starts with VALUE and switches to MAP dynamically upon reaching a pre-configured threshold:
- For RocksDB, this threshold is around 50 elements (given 250 byte records).
- For Heap, this threshold is around 400 elements (given 250 byte records).
However, such switching incurs some performance penalty, which is visible on lists of size up to 10 and results in ADAPTIVE performance ~20% lower than VALUE/LEGACY.
In such cases, to avoid performance drop, user might either choose LEGACY or VALUE; the latter allows to switch to ADAPTIVE strategy later.
Benchmark (adaptiveThreshold) (hasUpsertKey) (payloadSize) (retractDelay) (retractPercentage) (stateBackend) (strategy) (ttlTimeDomain) Mode Cnt Score Error Units SinkUpsertMaterializerBenchmark.run 400 true 250 2 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 10 174.303 ± 15.952 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 2 100 ROCKSDB VALUE EVENT_TIME thrpt 10 218.455 ± 18.098 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 2 100 ROCKSDB LEGACY EVENT_TIME thrpt 10 224.355 ± 19.125 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 10 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 10 102.678 ± 7.179 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 10 100 ROCKSDB VALUE EVENT_TIME thrpt 10 133.207 ± 6.577 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 10 100 ROCKSDB LEGACY EVENT_TIME thrpt 10 129.972 ± 9.499 ops/ms
Benchmark results
The benchmark (see code below) inserts a fixed number of records (inserts and deletes) in a loop and measures the time it takes to handle all the records. It uses a test harness, not a full Flink job.
The benchmark has the following parameters:
total number of records - fixed to 10K; all records are inserted under the same stream key
SumStrategy- LEGACY, VALUE, MAP, ADAPTIVEhasUpsertKey: true/false,stateBackendtype: HEAP/ROCKSDBpayload size - fixed to
250bytesretractPercentage- fixed to 100% (i.e. all records are eventually retracted, except for the first "retractDelay" records)retractDelay- from which record to start retraction
Both retractPercentage and retractDelay control how frequently retraction happens and how long the history is. Retraction is inefficient with LEGACY/VALUE strategy on long lists (because it's O(n)).
Note, that in practice, results would depend a lot on the payload and generated equals and hashCode functions. The more CPU-intensive these functions are, the slower the LEGACY/VALUE strategy is compared to MAP/ADAPTIVE.
Also, the results below are preliminary and might slightly change after the proposed implementation is approved and merged.
As can be seen below:
VALUEstrategy is superior for short histories, especially with Heap State Backendstarting at a certain history size, MAP becomes exponentially faster (the size depends on state backend, upsert key, retraction percentage):
performance of VALUE/LEGACY continues to degrade with list growth; MAP/ADAPTIVE is constant and doesn’t vary a lot
RocksDB State Backend
With ADAPTIVE strategy, it can be seen that performance degrades until the number of retained elements reaches 50; after that, implementation is changed to MAP which keeps performance relatively stable.
On the other hand, performance of the LEGACY strategy continues to degrade and eventually drops almost to zero:
SinkUpsertMaterializerBenchmark.run 50 false 250 2 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 285.419 ± 33.332 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 2 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 297.157 ± 109.149 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 10 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 146.888 ± 32.391 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 10 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 153.472 ± 95.353 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 50 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 35.537 ± 5.646 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 50 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 51.073 ± 29.070 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 100 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 34.679 ± 29.491 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 100 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 28.884 ± 18.927 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 1000 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 29.265 ± 10.787 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 1000 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 3.333 ± 2.242 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 5000 100 ROCKSDB ADAPTIVE EVENT_TIME thrpt 3 22.494 ± 3.781 ops/ms SinkUpsertMaterializerBenchmark.run 50 false 250 5000 100 ROCKSDB LEGACY EVENT_TIME thrpt 3 0.665 ± 0.234 ops/ms
It can also be seen that when not all records are retracted, performance of the current implementation drops dramatically:
Heap State Backend
Benchmark (adaptiveThreshold) (hasUpsertKey) (payloadSize) (retractDelay) (retractPercentage) (stateBackend) (strategy) (ttlTimeDomain) Mode Cnt Score Error Units SinkUpsertMaterializerBenchmark.run 400 false 250 2 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 4365.903 ± 45.798 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 2 100 HEAP LEGACY EVENT_TIME thrpt 10 3880.419 ± 63.156 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 10 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 3923.655 ± 36.138 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 10 100 HEAP LEGACY EVENT_TIME thrpt 10 3397.935 ± 41.414 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 50 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 3114.705 ± 71.971 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 50 100 HEAP LEGACY EVENT_TIME thrpt 10 2090.640 ± 23.502 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 100 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 2643.426 ± 78.599 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 100 100 HEAP LEGACY EVENT_TIME thrpt 10 2015.997 ± 21.539 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 1000 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 583.310 ± 5.751 ops/ms SinkUpsertMaterializerBenchmark.run 400 false 250 1000 100 HEAP LEGACY EVENT_TIME thrpt 10 210.121 ± 3.533 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 2 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 4059.498 ± 108.323 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 2 100 HEAP LEGACY EVENT_TIME thrpt 10 3503.332 ± 42.808 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 10 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 3769.139 ± 81.589 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 10 100 HEAP LEGACY EVENT_TIME thrpt 10 3159.818 ± 27.248 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 50 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 2663.425 ± 43.302 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 50 100 HEAP LEGACY EVENT_TIME thrpt 10 1554.082 ± 22.305 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 100 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 1987.133 ± 38.208 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 100 100 HEAP LEGACY EVENT_TIME thrpt 10 974.601 ± 11.899 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 1000 100 HEAP ADAPTIVE EVENT_TIME thrpt 10 1560.245 ± 21.188 ops/ms SinkUpsertMaterializerBenchmark.run 400 true 250 1000 100 HEAP LEGACY EVENT_TIME thrpt 10 103.242 ± 1.637 ops/ms
State Backend support
For RocksDB (or any backend that operates on serialized records), we rely on binary form of the keys (RowData) instead of the generated record equalizer. For upsert-key case, we project the records before using them as keys in RocksDB.
For HeapStateBackend, it is more tricky:
It uses
equals, which doesn’t employ the supplied record equalizerIf we inject the equalizer as a “transient” field to every record, it would be lost on recovery
On recovery, record type might change (e.g. from
ProjectedRowDatatoGenericRowData), breakingequals/hashCodeeven if we solve (1)hashCodeneeds to be efficient enough, but the set of fields is not known at the runtime
The address the above issues:
A wrapper around
RowDatais introduced and used as a keyEqualizer is injected into it, also during recovery
Equalizer is snapshotted into the meta info snapshot
hashCodefunction is generated at planning time and handled the same way as equalizer
TTL support
In Flink, TTL is managed per state element and doesn’t guarantee consistency across multiple states/elements. However, the proposed algorithm requires all three data structures (and even neighboring elements) to be consistent with each other. It’d require some non-trivial logic to reconcile all the inconsistencies arising from applying TTL. Such logic deemed to be prone to errors. Besides that, updating pointers in neighboring node “artificially” bumps their TTL.
All that makes the current TTL mechanism in Flink unsuitable for the proposed operator.
Instead, we’re planning to support TTL on operator level. The simplest approach is to rely on the insertion order property that we have for the history:
Attach insertion timestamp to every record when adding
Then, every
Nseconds (on system time):1. If the list is empty, return
2. If the timestamp of the first element in the list <= now - TTL - unlink the node (sqnToNodeState) - update rowToSqn if needed (rowToSqnState) - clear highestSqnState if it was the latest element
3. return to (1)
This solution is much simpler than reconciling inconsistencies caused by “system-level” TTL - at the cost of somewhat lower efficiency.
Apart from that, in case of upsert key, an entry might be updated in the middle of the list; that might delay the expiration of (potentially older) subsequent elements in the list. This can be solved by a more elaborate cleanup strategy based on additional pointers; but for now it seems to be a very narrow case.
While TTL is crucial for SinkUpsertMaterializer (and in particular, the proposed implementation, which doubles the state size); we plan to add TTL support as a follow-up effort (storing the timestamps along with every record will be implemented from the get go).
Note, that in the original implementation, TTL only works well for non degenerated cases with infinite key space that's ever growing. It doesn’t expire old elements for keys where new elements are being added.
Extracting a reusable primitive to use in other operators or functions
As discussed in the motivation section, there are existing and potential use cases for the data structure that we introduce here.
Additionally, extracting an interface could help with switching the implementation, testing, migration in the future.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
Existing users won’t be impacted - only those who explicitly enable the feature will get it enabled - for the new jobs
If we are changing behavior how will we phase out the older behavior?
Once we add TTL support, we’ll make the new version default and deprecate the old one
If we need special migration tools, describe them here.
The new implementation is state-incompatible with the current one
In the first iteration, we don’t plan to provide migration tools to minimize the risks (e.g. of state corruption during migration) and effort
Switching dynamically between the VALUE and MAP opens up a possibility to support migration from LEGACY; however, migration from LEGACY is out of scope of this FLIP
When will we remove the existing behavior?
Once the above is implemented, the new operator could also be used for the existing jobs, and the original one can be removed
Test Plan
We’ll extend the existing test suite to cover more cases using old and new implementation
We’ll add micro benchmarks - either as standalone tests to flink repo, or as JMH benchmarks to apache/flink-benchmarks
Rejected Alternatives
See also Solutions Outline for solutions that are orthogonal to this one.
Not supporting Heap State backend
Related is the decision to support HeapState backend (rather than choosing the algorithm based on the backend type). Otherwise, changing state backend wouldn’t be possible, even through a canonical savepoint.
Extracting a new State primitive
This would allow to customize the behavior for different State backends. However, O(n) algorithm becomes a problem for state backend sooner or later. In theory, we could have a dynamic switch from current to proposed algorithm at run time, based on N; and then have different thresholds for different backends. However, this introduces significant risks and complexity, which at the moment doesn’t seem justified.
Relying on orderliness property of state backend
An alternative way to get the OrderedMap is to rely on the property of RocksDBMapState which is implicitly sorted (as it's backed by a Sorted String Table).
"Linked" nodes won't be necessary with this approach. This would allow to some extent reduce the number of state accesses and simplify the code among other benefits.
However, we'd have to use iterators, e.g. to find penultimate element to emit when the last element is retracted. From our experience, iterators are much slower than point lookups.
References
[1] https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness

