Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Changelog Mode

In streaming SQL, there are generally four kinds of messages: INSERT (+I), UPDATE_BEFORE (-U), UPDATE_AFTER (+U), DELETE (-D). When a record is being inserted and then updated, this can be encoded as [ +I, -U, +U ]. In Flink, such messages might flow through the pipeline without a strict order even though they originate from the same event. However, downstream systems expect these messages strictly in order; otherwise, primary key violation or data loss might occur.

SinkUpsertMaterializer is the operator that reconciles these messages to ensure the order before sending them to the downstream system.

Please see [1] for more information on the topic and why this reconciliation is necessary. Highly recommended for a good understanding.


Current implementation

To achieve that, SinkUpsertMaterializer maintains a “history” of all the updates for the given key - represented as java.util.List in code. New records are usually added to the end of the list; retractions might remove an element at any position (this is a simplification; please refer to the above article for more details). The List is stored as a ValueState.

This approach has several significant drawbacks:

  1. On every update, need to read, (de)serialize, and write back the whole list

  2. To find the record to retract (or to update in case of the upsert key), the whole list has to be traversed in O(n) time

  3. The whole list has to be loaded in memory, which increases the memory pressure and creates the risk of OutOfMemory errors

As a result, in some of our production use cases, performance drops to 1-5 records per second. Such a low throughput, besides all, often results in hard back-pressure and checkpointing problems (even with Unaligned Checkpoints enabled).

That all renders the current implementation unusable and requires rewriting some queries (which is possible sometimes, but not always).


Solutions outline

The problem can be solved by:

  1. A more efficient implementation (which we introduce below)

  2. Cleaning up the state/history more aggressively (e.g. on watermark)

  3. Currently, we’re investigating significant changes to the SQL planner that would allow to avoid or largely change the semantics of SinkUpsertMaterializer

  4. Disallowing SinkUpsertMaterializer altogether and requiring users to rewrite their queries

  5. Probably, better support in the downstream systems

The preferred solution depends on the use case and the timeline.

We presume that for the most of Flink users (including ourselves), a more efficient implementation (1) works best - at least in the short-/mid-term.

This FLIP serves to synchronize the efforts in different directions and to document the design for (1).


Retraction-aware deduplication and similar use cases

Apart from that, there are some other cases when a similar functionality could be useful:

  1. Existing FirstValueWithRetractAggFunction, LastValueWithRetractAggFunction

  2. Potentially, retraction-aware lag and lead functions

  3. Potentially, efficient retraction-aware deduplication operator

The last one could be especially useful. In fact, SinkUpsertMaterializer already deduplicates retraction stream.

All of these cases could use the same data structure that is proposed in this FLIP.


Public Interfaces

New configuration option:

# The strategy to use in SinkUpsertMaterializer.

# Supported values:

# - LEGACY - simple implementation based on ValueState<List> (the original implementation)
# -- optimal for cases with history under approx. 100 elements
# -- limited TTL support (per key granularity, i.e. no expiration for old history elements)

# - MAP - OrderedMultiSet-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties
# -- faster and more memory-efficient on long histories
# -- currently, no TTL support (to be added in the future)
# -- slower on short histories
# -- requires more space

# - VALUE - similar to LEGACY, but compatible with MAP
# -- allows to switch to ADAPTIVE

# - ADAPTIVE
# -- alternate between MAP and VALUE depending on the number of entries for the given key
# -- starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low)

# The default is LEGACY

# The option takes effect during planning / compile plan generation.
# Existing jobs won't be affected by this option

table.exec.sink.upsert-materialize.strategy: [ LEGACY | MAP | VALUE | ADAPTIVE ]

# When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP
# The option takes effect during job (re)starting
table.exec.sink.upsert-materialize.adaptive.threshold.high:

# When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE
# The option takes effect during job (re)starting
table.exec.sink.upsert-materialize.adaptive.threshold.low: 


The threshold options above are @Experimental. When not specified, the default values will be used, which depend on the state backend used:

ParameterRocksDBHashMap
table.exec.sink.upsert-materialize.adaptive.threshold.high 50 400 
table.exec.sink.upsert-materialize.adaptive.threshold.low 40 300 

To implement the logic of choosing the value based on the state backend, the a new method getBackendTypeIdentifier will be added to KeyedStateStore:

// flink-core
package org.apache.flink.api.common.state

public interface KeyedStateStore {
    @Experimental
    getBackendTypeIdentifier();
}

(the exact return type is an implementation detail).

Proposed Changes

Similar to the original implementation, the new one captures the history of the changes for a record (and not the point-in-time view).

However, instead of a ValueState<List>, it emulates Ordered(Multi)Set using the existing Flink state primitives:

  • the order is maintained by attaching a SequenceNumber (long) to every element

  • “linking” to the next and previous elements is done using SequenceNumbers

  • fast lookup by the SequenceNumber is achieved using MapState<SequenceNumber, Node>

  • fast lookup by RowData is achieved using MapState<RowData, SequenceNumber>

  • the latest SequenceNumber is kept in a ValueState

Note that due to retractions, there might be “holes” in the list; however, that doesn’t affect correctness as long as the neighboring nodes are relinked together.


class Node {
  row:                RowData
  seqNo:              long
  prevSeqNo:          long
  nextSeqNo:          long
  nextSeqNoForRecord: long
}
rowToSqn = MapState<RowData, Long> // maps each row to first non-removed SeqNo
sqnToNode = MapState<Long, Node> // maps SeqNo to Node
highestSqn = ValueState<Long> // last sequence-number
--------------------
function AddRow(row)
  highSqn = highestSqn.get()
  newSqn = highSqn + 1
  newNode = new Node(row, newSqn, highSqn, /*next*/ null, /*nextForRecord*/ null)
  
  if (rowToSqn.contains(key))
    sqn = sqnToNode.get(key)
    node = rowToSqn.get(sqn)
    node.nextForRecord = newSqn
    rowToSqn.update(node)
  rowToSqn.put(row, newSqn)
  sqnToNode.put(newSqn, newNode)
  highestSqn.set(newSqn)
  if highSqn != null
    sqnToNode.put(highSqn, node with updated next sqn)
  emit INSERT (row)
--------------------
function RetractRow(row)
  seqNo = rowToSqn.get(row)
  if seqNo == null then log error and return
  node = sqnToNode.get(seqNo)
  curTail = highestSqn.get()
  remove(node)
  if highestSqn.get() == null
    emit DELETE(row)
  else if node.seqNo = curTail
    emit UPDATE_AFTER(highestSqn.get().row)
--------------------
function remove(node)
  // link prev node to next 
  if node.hasPrev
    sqnToNode.put(
      node.prevSeqNo,
      sqnToNode.get(node.prevSeqNo)
        .withNext(node.nextSeqNo))
  // link next node to prev
  if node.hasNext
    sqnToNode.put(
      node.nextSeqNo,
      sqnToNode.get(node.nextSeqNo)
        .withPrev(node.prevSeqNo))
  // update highestSqn
  else
    highestSqn.set(node.prevSeqNo)
  // advance or delete seq number for record
  if node.nextSeqNoForRecord == null
    rowToSqn.remove(row)
  else 
    rowToSqn.put(row, node.nextSeqNoForRecord)

In upsert-key case, the record is added to the existing position in the list (if it exists; otherwise, to the end).

Complexity analysis

Space

O(C * n), where

  • n is the number of records (“history size”)

  • C = 2 since we have two maps, both holding incoming records

Time

O(1), with 7 reads and 3 writes in the worst cast (2 reads can be eliminated)

Switching the algorithm dynamically

The implementation based on ValueState  might work better on short lists; for RocksDB, this is under 50

To minimize the regression in such cases, ADAPTIVE implementation starts with VALUE and switches to MAP dynamically upon reaching a pre-configured threshold:

  • For RocksDB, this threshold is around 50 elements (given 250 byte records).
  • For Heap, this threshold is around 400 elements (given 250 byte records).

However, such switching incurs some performance penalty, which is visible on lists of size up to 10 and results in ADAPTIVE performance ~20% lower than VALUE/LEGACY.

In such cases, to avoid performance drop, user might either choose LEGACY or VALUE; the latter allows to switch to ADAPTIVE strategy later.

Benchmark                            (adaptiveThreshold)  (hasUpsertKey)  (payloadSize)  (retractDelay)  (retractPercentage)  (stateBackend)  (strategy)  (ttlTimeDomain)   Mode  Cnt    Score    Error   Units
SinkUpsertMaterializerBenchmark.run                  400            true            250               2                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt   10  174.303 ± 15.952  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250               2                  100         ROCKSDB       VALUE       EVENT_TIME  thrpt   10  218.455 ± 18.098  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250               2                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt   10  224.355 ± 19.125  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              10                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt   10  102.678 ±  7.179  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              10                  100         ROCKSDB       VALUE       EVENT_TIME  thrpt   10  133.207 ±  6.577  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              10                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt   10  129.972 ±  9.499  ops/ms


Benchmark results

The benchmark (see code below) inserts a fixed number of records (inserts and deletes) in a loop and measures the time it takes to handle all the records. It uses a test harness, not a full Flink job.

The benchmark has the following parameters:

  • total number of records - fixed to 10K; all records are inserted under the same stream key

  • SumStrategy - LEGACY, VALUE, MAP, ADAPTIVE

  • hasUpsertKey: true/false, stateBackend type: HEAP/ROCKSDB

  • payload size - fixed to 250 bytes

  • retractPercentage - fixed to 100% (i.e. all records are eventually retracted, except for the first "retractDelay" records)

  • retractDelay - from which record to start retraction

Both retractPercentage and retractDelay control how frequently retraction happens and how long the history is. Retraction is inefficient with LEGACY/VALUE strategy on long lists (because it's O(n)).


Note, that in practice, results would depend a lot on the payload and generated equals and hashCode functions. The more CPU-intensive these functions are, the slower the LEGACY/VALUE strategy is compared to MAP/ADAPTIVE.

Also, the results below are preliminary and might slightly change after the proposed implementation is approved and merged.


As can be seen below:

  • VALUE strategy is superior for short histories, especially with Heap State Backend

  • starting at a certain history size, MAP becomes exponentially faster (the size depends on state backend, upsert key, retraction percentage):

  • performance of VALUE/LEGACY continues to degrade with list growth; MAP/ADAPTIVE is constant and doesn’t vary a lot


RocksDB State Backend

With ADAPTIVE strategy, it can be seen that performance degrades until the number of retained elements reaches 50; after that, implementation is changed to MAP which keeps performance relatively stable.

On the other hand, performance of the LEGACY strategy continues to degrade and eventually drops almost to zero:

SinkUpsertMaterializerBenchmark.run                   50           false            250               2                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  285.419 ±  33.332  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250               2                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3  297.157 ± 109.149  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              10                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  146.888 ±  32.391  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              10                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3  153.472 ±  95.353  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              50                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3   35.537 ±   5.646  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              50                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   51.073 ±  29.070  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250             100                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3   34.679 ±  29.491  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250             100                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   28.884 ±  18.927  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            1000                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3   29.265 ±  10.787  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            1000                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3    3.333 ±   2.242  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            5000                  100         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3   22.494 ±   3.781  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            5000                  100         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3    0.665 ±   0.234  ops/ms

It can also be seen that when not all records are retracted, performance of the current implementation drops dramatically:

75% retraction
Benchmark                            (adaptiveThreshold)  (hasUpsertKey)  (payloadSize)  (retractDelay)  (retractPercentage)  (stateBackend)  (strategy)  (ttlTimeDomain)   Mode  Cnt   Score    Error   Units
SinkUpsertMaterializerBenchmark.run                   50           false            250               2                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  20.045 ±  6.800  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250               2                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.535 ±  0.407  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              10                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  19.976 ±  7.006  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              10                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.534 ±  0.180  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              50                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  19.407 ±  7.530  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250              50                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.533 ±  0.296  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250             100                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  18.966 ± 12.128  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250             100                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.535 ±  0.740  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            1000                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  19.641 ±  5.399  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            1000                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.494 ±  0.182  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            5000                   75         ROCKSDB    ADAPTIVE       EVENT_TIME  thrpt    3  17.128 ±  3.324  ops/ms
SinkUpsertMaterializerBenchmark.run                   50           false            250            5000                   75         ROCKSDB      LEGACY       EVENT_TIME  thrpt    3   0.332 ±  0.358  ops/ms

Heap State Backend

Benchmark                            (adaptiveThreshold)  (hasUpsertKey)  (payloadSize)  (retractDelay)  (retractPercentage)  (stateBackend)  (strategy)  (ttlTimeDomain)   Mode  Cnt     Score     Error   Units
SinkUpsertMaterializerBenchmark.run                  400           false            250               2                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  4365.903 ±  45.798  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250               2                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  3880.419 ±  63.156  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              10                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  3923.655 ±  36.138  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              10                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  3397.935 ±  41.414  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              50                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  3114.705 ±  71.971  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              50                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  2090.640 ±  23.502  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250             100                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  2643.426 ±  78.599  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250             100                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  2015.997 ±  21.539  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            1000                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10   583.310 ±   5.751  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            1000                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10   210.121 ±   3.533  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250               2                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  4059.498 ± 108.323  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250               2                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  3503.332 ±  42.808  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              10                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  3769.139 ±  81.589  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              10                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  3159.818 ±  27.248  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              50                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  2663.425 ±  43.302  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250              50                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10  1554.082 ±  22.305  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250             100                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  1987.133 ±  38.208  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250             100                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10   974.601 ±  11.899  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250            1000                  100            HEAP    ADAPTIVE       EVENT_TIME  thrpt   10  1560.245 ±  21.188  ops/ms
SinkUpsertMaterializerBenchmark.run                  400            true            250            1000                  100            HEAP      LEGACY       EVENT_TIME  thrpt   10   103.242 ±   1.637  ops/ms
75% retraction
Benchmark                            (adaptiveThreshold)  (hasUpsertKey)  (payloadSize)  (retractDelay)  (retractPercentage)  (stateBackend)  (strategy)  (ttlTimeDomain)   Mode  Cnt    Score      Error   Units
SinkUpsertMaterializerBenchmark.run                  400           false            250               2                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  631.694 ±  402.540  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250               2                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   48.380 ±   38.804  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              10                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  603.859 ±  230.745  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              10                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   47.645 ±   17.438  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              50                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  513.087 ± 1100.741  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250              50                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   41.623 ±  108.601  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250             100                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  637.903 ±  100.444  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250             100                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   46.522 ±   12.859  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            1000                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  619.512 ±  247.517  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            1000                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   41.977 ±   15.712  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            5000                   75            HEAP    ADAPTIVE       EVENT_TIME  thrpt    3  631.456 ±  103.807  ops/ms
SinkUpsertMaterializerBenchmark.run                  400           false            250            5000                   75            HEAP      LEGACY       EVENT_TIME  thrpt    3   29.616 ±   65.635  ops/ms


State Backend support

For RocksDB (or any backend that operates on serialized records), we rely on binary form of the keys (RowData) instead of the generated record equalizer. For upsert-key case, we project the records before using them as keys in RocksDB.

For HeapStateBackend, it is more tricky:

  1. It uses equals, which doesn’t employ the supplied record equalizer

  2. If we inject the equalizer as a “transient” field to every record, it would be lost on recovery

  3. On recovery, record type might change (e.g. from ProjectedRowData to GenericRowData), breaking equals / hashCode even if we solve (1)

  4. hashCode needs to be efficient enough, but the set of fields is not known at the runtime

The address the above issues:

  • A wrapper around RowData is introduced and used as a key

  • Equalizer is injected into it, also during recovery

  • Equalizer is snapshotted into the meta info snapshot

  • hashCode function is generated at planning time and handled the same way as equalizer

TTL support

In Flink, TTL is managed per state element and doesn’t guarantee consistency across multiple states/elements. However, the proposed algorithm requires all three data structures (and even neighboring elements) to be consistent with each other. It’d require some non-trivial logic to reconcile all the inconsistencies arising from applying TTL. Such logic deemed to be prone to errors. Besides that, updating pointers in neighboring node “artificially” bumps their TTL.

All that makes the current TTL mechanism in Flink unsuitable for the proposed operator.

Instead, we’re planning to support TTL on operator level. The simplest approach is to rely on the insertion order property that we have for the history:

  • Attach insertion timestamp to every record when adding

  • Then, every N seconds (on system time):

    • 1. If the list is empty, return

    • 2. If the timestamp of the first element in the list <= now - TTL - unlink the node (sqnToNodeState) - update rowToSqn if needed (rowToSqnState) - clear highestSqnState if it was the latest element

    • 3. return to (1)

This solution is much simpler than reconciling inconsistencies caused by “system-level” TTL - at the cost of somewhat lower efficiency.

Apart from that, in case of upsert key, an entry might be updated in the middle of the list; that might delay the expiration of (potentially older) subsequent elements in the list. This can be solved by a more elaborate cleanup strategy based on additional pointers; but for now it seems to be a very narrow case.

While TTL is crucial for SinkUpsertMaterializer (and in particular, the proposed implementation, which doubles the state size); we plan to add TTL support as a follow-up effort (storing the timestamps along with every record will be implemented from the get go).

Note, that in the original implementation, TTL only works well for non degenerated cases with infinite key space that's ever growing. It doesn’t expire old elements for keys where new elements are being added.

Extracting a reusable primitive to use in other operators or functions

As discussed in the motivation section, there are existing and potential use cases for the data structure that we introduce here.

Additionally, extracting an interface could help with switching the implementation, testing, migration in the future.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

    • Existing users won’t be impacted - only those who explicitly enable the feature will get it enabled - for the new jobs

  • If we are changing behavior how will we phase out the older behavior?

    • Once we add TTL support, we’ll make the new version default and deprecate the old one

  • If we need special migration tools, describe them here.

    • The new implementation is state-incompatible with the current one

    • In the first iteration, we don’t plan to provide migration tools to minimize the risks (e.g. of state corruption during migration) and effort

    • Switching dynamically between the VALUE and MAP opens up a possibility to support migration from LEGACY; however, migration from LEGACY is out of scope of this FLIP

  • When will we remove the existing behavior?

    • Once the above is implemented, the new operator could also be used for the existing jobs, and the original one can be removed

Test Plan

  • We’ll extend the existing test suite to cover more cases using old and new implementation

  • We’ll add micro benchmarks - either as standalone tests to flink repo, or as JMH benchmarks to apache/flink-benchmarks

Rejected Alternatives

See also Solutions Outline for solutions that are orthogonal to this one.


Not supporting Heap State backend

Related is the decision to support HeapState backend (rather than choosing the algorithm based on the backend type). Otherwise, changing state backend wouldn’t be possible, even through a canonical savepoint.


Extracting a new State primitive

This would allow to customize the behavior for different State backends. However, O(n) algorithm becomes a problem for state backend sooner or later. In theory, we could have a dynamic switch from current to proposed algorithm at run time, based on N; and then have different thresholds for different backends. However, this introduces significant risks and complexity, which at the moment doesn’t seem justified.


Relying on orderliness property of state backend

An alternative way to get the OrderedMap is to rely on the property of RocksDBMapState which is implicitly sorted (as it's backed by a Sorted String Table).

"Linked" nodes won't be necessary with this approach. This would allow to some extent reduce the number of state accesses and simplify the code among other benefits.

However, we'd have to use iterators, e.g. to find penultimate element to emit when the last element is retracted. From our experience, iterators are much slower than point lookups.


References

[1] https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness