Discussion thread


Vote thread


ISSUE

https://github.com/apache/incubator-paimon/issues/2270

Release

TBD

Motivation

Flink Dual Stream Join

Core Challenge:

Growth of State Storage: When performing real-time stream joins, Flink needs to maintain a state that holds the data pending to be joined. If the data arrival rate of one stream far exceeds that of the other, this state can grow significantly large, leading to storage and performance issues.

Flink Lookup Join

Core Challenge:

The primary concern is with changes in the main stream; modifications in the dimension table cannot update data that has already been joined. Therefore, to accomplish data enrichment tasks in Flink, developers with extensive business experience are usually required. They need to have a deep understanding of the size of the business data and the latency of its changes. Additionally, they must configure a T+1 scheduled job to correct the data daily at midnight.

Partial Update (Sequence-Group)

Core Challenge:

Common Primary Key Requirement for Join Tables: When executing a join operation, it is imperative to ensure that there is a common primary key between two or more data sources for matching purposes.

Overview

In data pipelines built with Flink and CDC technologies, there is often a requirement to enrich data in real time by merging multiple tables into a single wide table. Consider the scenario with three interconnected tables A, B, and C, forming a join chain:

Our goal is to update the aggregate view of tables A, B, and C in real time.

Traditional Lookup Join Limitations

The traditional lookup join approach is limited by its sole response to changes in the main data stream. This means that if a related dimension table (such as B or C) undergoes changes, the data that has already been joined cannot be dynamically updated. In other words, a lookup operation is only triggered when a new event enters the main stream, and updates in dimension tables do not result in changes to the join results.

New Strategy: Dynamic Dimension Table-Driven Lookup Join

To overcome this limitation, we have adopted a new strategy where all tables (A, B, and C) are treated as dynamic dimension tables. We no longer regard any table as static; all changes are dynamically reflected in the main table. The core idea is to use changes in dimension tables to trigger data enrichment.

For example, when table C changes, we first find the related content in table B using the changed data's n field, and then retrieve the value of the m field. This m value becomes our new primary key, based on which we perform the lookup join operation, updating the aggregate view of A, B, and C in real time.

Optimization: Partial-Update for Large Table Challenges

For scenarios where changed tables have common primary keys and are suitable for partial updates, we adopt a direct update dispatch approach. This avoids the performance issues associated with loading large table data into the lookup cache in its entirety.

Conclusion

By treating all data tables as dynamic dimension tables and considering all changes as signals to trigger data enrichment, we have successfully addressed the issues faced by the traditional lookup join method. Now, changes in any table can immediately trigger updates, ensuring data completeness and timeliness. This method provides an efficient and flexible solution for real-time data enrichment in data stream processing.

Implementation


Compatibility, Deprecation, and Migration Plan

This is a new additional feature.

Test Plan

Unit Tests

IT Testing: Verification of Logic. 

  • No labels