Discussion threadhttps://lists.apache.org/thread/9b9cy8o2qjt7w2n7j0g4bbrwvy9n61xv
Vote threadAccepted: https://lists.apache.org/thread/3z70lcpotdx6785dpsrvmncwyfb5mnnw
JIRA

FLINK-37481 - Getting issue details... STATUS

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP proposes a multi-way streaming join operator able to perform regular joins across multiple tables, when they are joined on a common join key(s). Currently, regular joins (non-temporal inner joins, left joins, right joins and full joins) across multiple tables are executed as a chain of binary join operators. Each binary join produces results, we’ll call these intermediate state, which must be stored in Flink's state forever. For complex queries with many joins, or joins involving large tables with low selectivity, this intermediate state can become excessively large. This "big state issue" leads to several problems for users:

  1. Performance Degradation: Large state increases checkpointing times, recovery times, and can lead to slower record processing due to state access overhead.

  2. Resource Consumption: Significant state requires more disk space and potentially more memory, increasing operational costs.

  3. Operational Instability: Jobs with very large state are more prone to backpressure issues and failures.

This proposal introduces a dedicated multi-way join operator* (StreamingMultiJoinOperator) designed to mitigate these issues by processing joins across multiple input streams simultaneously, thereby eliminating the need to store intermediate join results in state. While this "zero intermediate state" approach prioritizes state reduction, it may involve more computation compared to optimally reordered binary joins in scenarios where intermediate results would naturally be very small. However, it offers significant benefits in the common cases where intermediate state growth is a problem.

  • Important restrictions: although the FLIP wants to improve regular multi joins for all use cases, the implementation of a multi-way join is complex and has natural intrinsic technical limitations that do not allow it to be applied to all cases. We can only cover part of the use cases supported by binary joins, but we think these are relevant and latent problems: regular non-temporal inner, left and right outer joins that share the same join key and grow state indefintely. More on this under limitations.

Public Interfaces

The primary public-facing changes are configuration options to control the activation and scope of this optimization:

  1. New Configuration Options

    • table.optimizer.multi-join.enabled: (Boolean, default: false) Enables the optimizer rule that replaces eligible chains of binary joins with the multi-way join operator.

    • table.optimizer.multi-join.max-tables: (Integer, default: TBD, e.g., not set or a fixed number tbd while benchmarking) Sets a limit on the maximum number of inputs (tables) that can be combined into a single multi-way join operator. This allows user to pick the granularity of "how large" they want their multi-way join operator to be. One multi join operator with 20 inputs could be broken into two with 10 inputs each, for example. This is useful when one single operator becomes overloaded, which might happen if there's one key-value data skew or in other extreme cases.

The change affects the physical execution plan generated by the SQL optimizer.

Proposed Changes

We propose introducing a new physical stream operator, StreamingMultiJoinOperator, designed to handle joins between three or more input streams within a single operator instance. The draft PR for the operator is already there  https://github.com/apache/flink/pull/26313

  1. Operator State

    1. We only store the input records. That means, if we are joining 5 tables, we will store are the records for each of the 5 tables.

    2. No intermediate results stored.

    3. The operator operates on a keyed state store. Records in a operator are partitioned by at least one common join key.

  2. Operator Logic

    • Given any incoming record of any the N inputs joined, the operator performs the join accross all inputs. If there's a match across all join conditions, we emit the proper rows.
    • It correctly handles retractions (DELETE, UPDATE_BEFORE) and appends (INSERT, UPDATE_AFTER) to maintain consistent results in streaming scenarios. This was one of the most complex parts to get right without relying on intermediate state. After multiple interactions, we have a consistent algorithm that does not require intermediate state: we calculate in memory, on the go, for the current combination of rows, how many associations we already had. The reprocessing is necessary and most efficient in cases where users are using primary keys since we’d then have one record returned from state for each input.

    • It maintains state for each input stream independently using JoinRecordStateView. This state stores the records received from that specific input, keyed by the join key(s).

    • It supports INNER and LEFT OUTER joins between the inputs.

    • A more detailed description on how the operator works with examples is in the Javadoc in the operator PR.
    • State TTL: we automatically clean up state for records that fall outside the specified retention time stateRetentionTime for the input before perfoming a join. More precisely, it uses the same logic used by the current StreamingJoinOperator of having one state ttl for each input.

    • Watermarks: we use the default implementation of AbstractStreamOperatorV2 for watermarks handling. In short, watermarks are not relevant for the operator logic itself since we’re only dealing with non-temporal regular joins.

  3. Optimizer Integration

    • Apart from the operator, there are multiple steps necessary for us to instatiate it properly: extend FlinkJoinToMultiJoinRule, add configuration options, add StreamPhysicalMultiJoin physical node, add StreamExecMultiJoin exec node and other modifications. If table.optimizer.multi-join.enabled is true, we’ll move on with a LogicalFlinkMultiJoin that will eventually be mapped to the operator. These changes will be addressed in individual PRs.

  4. Benefits Realized

    • Zero Intermediate State: Eliminates state required for intermediate join results.

    • Reduced Changelog Normalization: Consolidates multiple joins, potentially reducing the number of ChangelogNormalize operators needed, further saving state and computation.

    • Late Materialization: Projections and calculations on joined rows are only performed for combinations that satisfy all relevant join conditions across the participating tables, avoiding computation on intermediate results that would later be filtered out.

    • Less writes to state: Intermediate results do not have to be written to state which is expensive. Tradeoff here is computing intermediate results again for left joins - computation is considerably reduced if joining on primary keys, which is usual.

Limitations

  • Common Partitioning Key Required

    •  For the StreamingMultiJoinOperator to function correctly without requiring full data broadcasting, all inputs must be partitionable by a single common join key. This is natural technical limitation due to partitioning. This means the join conditions across all tables involved in the multi-way join must transitively relate to the same key attribute(s).

      • Example supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by key)

      • Example supported (tentative): A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by key via transitivity)

      • Example NOT Supported: A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2 (No single key allows partitioning A, B, and C together for this join sequence). Since we’re in a distributed context, we need to make sure all relevant records land in the same operator instance. For this case, currently, that’s not possible from a technical perspective. Queries like this will fall back to using standard binary join operators.

  • Supported Join Types

    • This initial implementation focuses on optimizing chains containing INNER JOIN and LEFT OUTER JOIN.

      • RIGHT OUTER JOIN: the current optimizer rules (JoinToMultiJoinRule) do not flatten RIGHT joins into the multi-join. Supporting RIGHT joins would require enhancing the optimizer rules to rewrite them as LEFT joins (by swapping inputs) before multi-join planning, which is a common practice and should be doable! This will be done as part of another ticket.

      • FULL OUTER JOIN: Currently not supported by Calcite's MultiJoin and optimizer rules. Specifically, it’s not clear if a multi-way join could represent any improvement over binary joins due to increased complexity in handling symmetric null padding and retractions - it very likely requires intermediate state and potentitally as much as binary joins require. Full outer joins will continue to use binary joins for now and addressing this would require further research and demonstrating an algorithm with clear performance and state benefits.

      • SEMI and ANTI joins are currently not supported by JoinToMultiJoinRule . That means, they require first changes to the optimizer rule before being able to land properly on the operator. The operator logic has to be adapted to account for them. This can be picked up separately.

      • Queries containing unsupported join types (SEMI and ANTI, FULL OUTER) within a potential chain will either use standard binary operators for those specific joins or break the sequence into smaller multi-joins/binary joins.

  • Async state support

    • Similar to the regular binary StreamingJoinOperator, where a AsyncStreamingJoinOperator was added, we also need to add a AsyncStreamingMultiJoinOperator for async state support. This can be picked up separetly after the first end-to-end version of the operator with a sync state has been finished.

Compatibility, Deprecation, and Migration Plan

  • Impact on Existing Users

    • By default (table.optimizer.multi-join.enabled=false), there is no impact; existing jobs will continue using chained binary joins.

    • If enabled, queries with 3+ joins meeting the criteria may be planned using the new operator. This will change the physical plan and resource usage profile (less state, potentially different CPU usage). The logical results of the query should remain identical.

    • After analyzing the stability of the operator, we can consider enabling this as default for the supported use cases in a second iteration.

  • Phasing

    •  The feature is opt-in via the configuration flag. The existing binary join implementation remains the default and fallback mechanism.

Test Plan

The implementation will be tested through:

  1. The implementation will be tested through:

    1. Operator harness tests: Covering the internal logic of StreamingMultiJoinOperator, state handling, INNER/LEFT join condition evaluation, and outer join null padding, changelog consistency.

    2. SQL End-to-End Tests: Creating streaming SQL queries involving various combinations of 3+ tables with INNER and LEFT joins, specifically targeting:

      • Queries with a clear common partitioning key.

      • Queries lacking a common partitioning key (to ensure fallback to binary joins).

      • Queries involving FULL joins (to ensure fallback).

      • Verification of correct results compared to equivalent queries using binary joins.

      • Verification of correct plan generation (multi-join operator used only when criteria are met).

      • Handling of updates and retractions.

      • State size reduction compared to binary join plans for the supported queries.

      • At this point, I can also share some benchmarks with the community since we can compare end-to-end a chain of binary joins and a multi-join.
    3. Benchmark: It's certain that for the supported cases we'll require considerably less state and that's the main goal. Regarding runtime, it might be slower or faster than binary joins depending on the join order, selectivity, if frequently updated tables are on the left and other factors. Since the StreamingMultiJoinOperator replaces multiple StreamingJoinOperators, having real benchmark values joining on N inputs only based on the operator is not possible. Therefore, after having one basic end-to-end version, I plan to perform some benchmarking and to share the results.

Rejected Alternatives

  • Status Quo (Chained Binary Joins): we could internally make the operator function like a chain of binary joins. However, that suffers from the same large intermediate state issues for complex join chains.

  • Multi-Join with Partial/Adaptive Intermediate State: We can theoretically at runtime decide if we want to save a set of intermediate results in state if they’re small and often used (or on other heuristics). This definitely has potential to optmize multi joins even more but it's not part of the scope of the FLIP to focus on a purely zero-intermediate-state benefit for the supported patterns as a first end-to-end working version. Work on this can be done separetly and doesn't have to affect the layout of the operator.

  • Broadcast Multi-Way Join: A multi-way join that broadcasts some or all inputs would remove the common partitioning key restriction but would have significantly different performance characteristics and network overhead. In interesting idea but this is basically a StreamingBroadcastMultiWayJoin would be a topic for another FLIP.

Recursive vs Iterative

To make sure it's documented: I think it’s important to quickly touch on why the logic used in the join operator uses recursion instead of being iterative and that this was considered. An iterative approach would produce a much longer and less readable code. Since we’re doing a depth search first on an unknown number of N tables, we’d need to store some kind of queue with a stack of states. The logic flow would be less intuitive. In general, this is not natural and not a fitting approach here.

Recursively going through the tables feels naturally matches the tree-like depth first exploration of join combinations, making the logic clearer. With recursive algorithms, there’s a common drawback to watch out for which is hitting StackOverflowError. This could theoretically happen for extremely deep joins but it’s very unlikely here. The variables that we’re calling in our recursiveMultiJoin are not memory-intensive and mostly references to objects that stored in the heap and not in the call stack. As a basic estimation, we have depth (int): 4 bytes inputId (int), 4 bytes input (ref), 4 bytes currentRows (ref), 4 bytes associations (ref), 4 bytes phase (ref), 4 bytes Local Variables: isLeftJoin (boolean), ~1-4 bytes, matched (boolean): ~1-4 bytes → rough total per call: ~40 to 64 bytes. If 10 MB are available for the call stack of a task manager, we’d be able to process ~200 inputs.