Discussion threadhttps://lists.apache.org/thread/pd9gzslq20dtzzfphxqvwhc43hrzo2y1
Vote threadhere (<- link to https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0

here (<- link to FLINK-34219 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Currently, when performing cascading connections in Flink, there is a pain point of record amplification. This FLIP aims to effectively solve this problem .

As mentioned in discussion,  record amplification would be quite pronounced when using Flink to perform join operations with outer joins on 10 tables.  An update to the first table will trigger an exponential increase in the number of records. Specifically, the first join operation will generate two records: a delete and a insert. Subsequent join operations double the number of incoming records, so that the second operation yields four records. This pattern continues until the last join operation generates 2^10 which is 1024 records. A diagram illustrating the record amplification is shown as below. This cumulative effect of record amplification can lead to a significant decrease in system performance and a sharp increase in resource consumption.

Public Interfaces

The is no additional change about public interfaces. 

Three options about minibatch is required to set to make minibatch join work. The example is as following. Go to options for more details about these three options .

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 50000

Proposed Changes

A new operator called MiniBatchStreamingJoinOperator which inherits from StreamingJoinOperator is introduced. It now supports four types of joins: left join, right join, full join and inner join (not support semi / anti join). 

It will use a block of memory as a minibatch to store data input from the left and right streams like HeapBufferedBundle for GroupAggregate operation. The records in the minibatch is processed when triggered by minibatch size or watermark. The following is the UML of MiniBatchStreamingJoinOperator.

The explaination of 3 points of optimization and its corresponding scenarios could be referenced in the appendix  and the nexmark metrics is also listed. Besides, go to implementation could get details of the POC.

Compatibility, Deprecation, and Migration Plan

The proposal exclusively impacts the functionality of the streamingJoinOperator when handling paired records. A switch is used to make sure streamingJoinOperator works as before when disabling minibatch join. 

Test Plan

The test will focus on

  • Functionality Test

    • The operator-level MiniBatchStreamingJoinOperator works as expected.

    • The BufferBundle works as expected.

Rejected Alternatives


  • No labels