Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/pvylkyfc5yq3tks0vx7kpzt5k8gjy0x1

Vote thread: https://lists.apache.org/thread/75gb59qhm25d19fl1b86fh0p9dyym72y

JIRA: FLINK-37836 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink streaming jobs, the large state of Join nodes has been a persistent concern for users. Since streaming jobs are long-running, the state of join generally increases in size over time. Although users can set the state TTL to mitigate this issue, it is not applicable to all scenarios and does not provide a fundamental solution. An oversized state can lead to a series of problems, including but not limited to:

  1. Resource bottlenecks in individual tasks

  2. Slow checkpointing, which affects job stability during the checkpointing process

  3. Long recovery time from state

In addition, when analyzing the join state, we find that in some scenarios, the state within the join actually contains redundant data from the source tables.

To address this issue, we aim to introduce Delta Join, which is based on the core idea of leveraging a bidirectional lookup join approach to reuse source table data as a substitute for join state.

What is Delta Join

In simple terms, Delta Join is a join algorithm that does not require the maintenance of intermediate states.

If we have the following join of two streams:

We can optimize it as follows:

The theoretical foundation of Delta Join is as follows. Taking the join of two streams as an example, the formula for calculating the incremental part of the join result is:

Example

Let's take the following example of a stream join.


We can optimize it as follows:

$$\Delta(A \Join B) = \Delta A \Join B + A \Join \Delta B + \Delta A \Join \Delta B = \Delta A \Join (B + \Delta B) + A \Join \Delta B$$ Here, $A$ represents the complete historical data of the left table, while $\Delta A$ denotes the newly added data in the left table. The definitions for $B$ and $\Delta B$ follow similarly.

Whenever we need to compute the incremental part of the join result, we only need to examine the intermediate data generated in the source table from the last computation to the current time and query the historical snapshot data from the counterpart source table.

From a theoretical standpoint, since we require 1. awareness of $\Delta$ data and 2. access to the historical snapshot data of the source table, it is crucial that the source table supports snapshot isolation to ensure strong consistency semantics. However, several issues currently exist:

  • Only a limited number of storage systems, such as Paimon, Iceberg and Hudi, support snapshotting.

  • The time interval for snapshot generation is at least on the order of minutes, leading to delays due to micro-batching.

  • When querying the counterpart source table, it is necessary to specify the snapshot to retrieve data, and snapshots may expire within the storage system.

Due to the concerns outlined above, we propose a Delta Join that meets real-time requirements with eventual consistency.

Delta Join with Eventual Consistency

The Delta Join with eventual consistency does not require snapshot isolation for the source table data. Instead, it always looks up the latest table data from the counterpart table during each join operation. The computation formula for the result is as follows:

$$\Delta A \Join (B + \Delta B) + (A + \Delta A) \Join \Delta B$$ Compared to strong consistency semantics, this formula introduces an additional portion of intermediate results $\Delta A \Join \Delta B$. As a result, this approach can only ensure eventual consistency of the final result while providing low latency.

This eventual consistency Delta Join does not require snapshot isolation for the source table data and always looks up the latest table data from the counterpart table during each join operation. However, it also requires that queries adhere to certain constraints. For detailed information, please refer to the limitations section.

Comparison with Streaming Join

Here are the differences between Stream Join and the two types of Delta Joins with different consistency:


Streaming Join

Delta Join with Strong Consistency

Delta Join with Eventual consistency

latency

low

high

low

state size

big

middle

almost none

state details

All historical detailed data from both sides of the input.

The accumulated incomplete delta data from both sides.

An ordered join key queue that is waiting to be joined.

data consistency

strong

strong

has redundant intermediate data

has limitations

no

no

yes

Public Interfaces

Configuration Options

A series of new configuration options related to the DeltaJoin will be introduced.

// whether to enable delta join
// AUTO: Optimizer will try to use delta join first. If failed, it will fallback to regular join.
// FORCE: Enforce to use delta join. If failed, an exception will be thrown.
// NONE: Don't try to use delta join.
// default is 'AUTO'
table.optimizer.delta-join.strategy: AUTO/FORCE/NONE
 
// whether to enable the cache in delta join
// default is true
table.exec.delta-join.cache-enable: true/false
 
// the cache size of the left side in delta join
// default is 10000L
table.exec.delta-join.left-cache-size: 10000L
 
// the cache size of the right side in delta join
// default is 10000L
table.exec.delta-join.right-cache-size: 10000L

Introduce Index in CatalogTable

Note: This Flip does not currently address the introduction of Index syntax.

The pojo of Index is below.

/**
 * An index is a copy of selected columns of data, from a table, that is designed to enable very
 * efficient search. An index normally includes a "key" or direct link to the original row of data
 * from which it was copied, to allow the complete row to be retrieved efficiently.
 */
@Experimental
public interface Index {

    /** Get the name of this index. */
    String getName();

    /** Get the columns of this index. */
    List<String> getColumns();

    /** Print the index in a readable way. */
    String asSummaryString();
}

Add indexes in Schema.

@PublicEvolving
  public final class Schema {

    ...
    private final List<UnresolvedIndex> indexes;

    public Schema(
      List<UnresolvedColumn> columns,
      List<UnresolvedWatermarkSpec> watermarkSpecs,
      @Nullable UnresolvedPrimaryKey primaryKey) {
      this(columns, watermarkSpecs, primaryKey, Collections.emptyList());
    }

    public Schema(
      List<UnresolvedColumn> columns,
      List<UnresolvedWatermarkSpec> watermarkSpecs,
      @Nullable UnresolvedPrimaryKey primaryKey,
      List<UnresolvedIndex> indexes) {
      ...
    }

    ...

    public List<UnresolvedIndex> getIndexes() {
      return indexes;
    }

    ...

    @PublicEvolving
      public static final class Builder {

        ...

        private final List<UnresolvedIndex> indexes;

        ...
      }

    /**
         * Declares an index for a set of given columns. Indexes are designed to enable very
         * efficient search. The indexes are informational only and can be used for optimizations.
         * It is the data owner's responsibility to guarantee the index queries allow the complete
         * row to be retrieved efficiently.
         *
         * <p>The index will be assigned a generated name in the format {@code INDEX_col1_col2}.
         *
         * @param columns indexes that form a table index
         */
    public Builder index(String... columns) {
      Preconditions.checkNotNull(indexes, "Index column names must not be null.");
      return index(Arrays.asList(columns));
    }

    /**
         * Declares an index for a set of given columns. Indexes are designed to enable very
         * efficient search. The indexes are informational only and can be used for optimizations.
         * It is the data owner's responsibility to guarantee the index queries allow the complete
         * row to be retrieved efficiently.
         *
         * <p>The index will be assigned a generated name in the format {@code INDEX_col1_col2}.
         *
         * @param columns indexes that form a table index
         */
    public Builder index(List<String> columns) {
      Preconditions.checkNotNull(indexes, "Index column names must not be null.");
      final String generatedIndexName =
        columns.stream().collect(Collectors.joining("_", "INDEX_", ""));
      return indexNamed(generatedIndexName, columns);
    }

    /**
         * Declares an index for a set of given columns. Indexes are designed to enable very
         * efficient search. The indexes are informational only and can be used for optimizations.
         * It is the data owner's responsibility to guarantee the index queries allow the complete
         * row to be retrieved efficiently.
         *
         * @param indexName the name of the index
         * @param columns columns that form a table index
         */
    public Builder indexNamed(String indexName, List<String> columns) {
      Preconditions.checkNotNull(indexName, "Index name must not be null.");
      Preconditions.checkNotNull(columns, "Index column names must not be null.");
      this.indexes.add(new UnresolvedIndex(indexName, columns));
      return this;
    }

    /** Declaration of an index that will be resolved to {@link Index} during schema resolution. */
    @Experimental
      public static final class UnresolvedIndex {
        private final String indexName;
        private final List<String> columnNames;

        public UnresolvedIndex(String indexName, List<String> columnNames) {
          this.indexName = indexName;
          this.columnNames = columnNames;
        }

        // getters / equals / hashCode
        ...

        @Override
        public String toString() {
          return String.format(
            "INDEX %s (%s)",
            EncodingUtils.escapeIdentifier(getIndexName()),
            getColumnNames().stream()
            .map(EncodingUtils::escapeIdentifier)
            .collect(Collectors.joining(", ")));
        }
      }
  }

Add indexes in ResolvedSchema.

@PublicEvolving
public final class ResolvedSchema {

    ...
    private final List<Index> indexes;

    public ResolvedSchema(
            List<Column> columns,
            List<WatermarkSpec> watermarkSpecs,
            @Nullable UniqueConstraint primaryKey) {
        this(columns, watermarkSpecs, primaryKey, Collections.emptyList());
    }

    public ResolvedSchema(
            List<Column> columns,
            List<WatermarkSpec> watermarkSpecs,
            @Nullable UniqueConstraint primaryKey,
            List<Index> indexes) {
        ...
    }

    // Getter
    ...
}

Proposed Changes

This FLIP will not dig into detailed implementation specifics, but it will offer a brief overview of the implementation approach.

When and How to convert

A new rule add be introduced in the PHYSICAL_REWRITE phase to transform the StreamPhysicalJoin into a StreamPhysicalDeltaJoin.

In certain situations, if the original StreamPhysicalJoin has an update kind trait of [UA, UB] (for example, if there is an aggregation node downstream or if the downstream sink is a retract sink), a Deduplicate node will be introduced after the StreamPhysicalDeltaJoin to convert the +U records back to -U and +U records.

Furthermore, even if downstream operators require the complete changelog for both UA and UB, we can also allow the upstream to attempt to generate the stream with only UA, provided that the Delta Join combined with a Deduplication suffices. Therefore, this rule also re-infers the update kind traits of the Delta Join inputs.

If the upstream cannot completely transform into a stream that excludes UB, or if it includes unsupported operators (for example, the changelog normalize operator is currently not on the whitelist), we will give up this optimization.

The general flowchart for the check process is as follows. For more details about these validations, please ref to the chapter "Limitations about Delta Join with Eventual Consistency".


Details in Delta Join operator

We will implement a DeltaJoinOperator, which includes two AsyncDeltaJoinRunners to perform tasks similar to those of AsyncLookupJoinRunner. Additionally, we will introduce caches on both sides to avoid frequent queries to external tables. Each side will maintain its own LRU Cache, which stores the data provided for queries to the counterpart. The cache size will be controlled by Flink configuration.

For example, when data arrives on the left side, it will first query the counterpart's cache. If there is a cache miss, it will then query the external table and store the result in the counterpart's cache. At the same time, the input-side cache will also need to be updated with the incoming data.

Prerequisite

[FLIP-519] Support Async Keyed Ordered Lookup Join [1]

Limitations about Delta Join with Eventual Consistency

Strong Limitations

Any situation that violates the limitation listed below will not be supported for optimization into a Delta Join.

  1. Tolerate eventual consistency for the final result.

  2. There are equivalent join key conditions, and join keys must be an index or include an index.

  3. Do not support cascading joins now.

  4. Support INNER/LEFT/RIGHT/FULL join type now.
  5. All sources should support async LookupTableSource.

  6. Only specific stateless operators are allowed between the source and the join, as outlined in a whitelist. For example, the following nodes are included in the whitelist. In the future, more nodes can be added to it.

    1. TableScan with the source specs below:

      1. FilterPushdownSpec

      2. ProjectPushdownSpec

      3. ReadingMetadataSpec

      4. SourceWatermarkSpec

      5. WatermarkPushDownSpec

    2. Calc (without calculating the join keys)

    3. DropUpdateBefore

    4. WatermarkAssigner

    5. MiniBatchAssigner

    6. Exchange

Week Limitations

In certain cases, these limitations can be overlooked.

  1. When the input changelog contains update, the join key must be either a primary key or a part of a composite primary key.

  2. The input changelog must meet the following conditions:

    1. If it only includes [I] or [I, UA], there are no limitations.

    2. If it contains UB, optimization is not supported.

    3. If it contains D, since the other side may also have D messages, the probe side might lookup no data in the other side during the join process, which could result in no output. As a result, the D message will forward to the downstream operator, and the following constraints must be met:

      1. Only one input of the Delta Join is allowed to produce D:

        1. For inner joins, only one of the two inputs can produce D.

        2. For left joins, only the left input can produce D.

        3. For right joins, only the right input can produce D.

        4. For full joins, neither input can produce -D.

      2. The join node must satisfy the DeleteKindTrait.DELETE_BY_KEY.

Compatibility, Deprecation, and Migration Plan

These APIs are newly introduced and do not involve compatibility issues.

Test Plan

HarnessTests and IT tests will be added to validate the changes.

Future Work

  • Support for a variety of index types, such as UNIQUE, to optimize the structure of the cache.

  • Support for cascading joins.

  • Support for batch-stream fusion for Delta Join operator to accelerate job restart and recovery.

  • Support for Delta Join with strong consistency.

Rejected Alternatives

Using Primary Key Without Introducing New Index POJO

To determine whether to optimize into a delta join, we evaluate whether the join key includes the primary key, without introducing index.

However, in practical business scenarios, it is relatively uncommon for the join key to consistently include the primary key. Additionally, in a delta join, we need to focus on retrieving columns that enable efficient querying of the table data, which may not always align with the primary key.

Support Sync LookupTableSource

Besides supporting async LookupTableSource, it also supports sync LookupTableSource.

However, considering the need to align performance with streaming joins, the benefits of using a sync LookupTableSource are not significant.

References

[1] FLIP-519: Introduce async lookup key ordered mode


  • No labels