Motivation

In many streaming applications, particularly real-time analytics and monitoring systems, it is valuable to obtain partial results earlier rather than waiting for full join conditions to finalize. For Flink SQL interval joins, results are typically delayed until watermarks ensure no more matches can occur. This delay can be challenging for scenarios that require fast feedback. Early fire support addresses this by emitting intermediate results speculatively and using retractions or updates to maintain eventual consistency and ensure correctness. This approach can, however, change an append-only stream into a potentially update stream, so the planner and downstream components must be able to handle retraction/upsert messages.

Public Interfaces

Hints for Early Fire

Users can enable and configure early fire support using SQL hints. The following hint syntax is proposed:

SELECT /*+ EARLY_FIRE('delay'='5s','time_mode'='rowtime|proctime') */
    A.id, B.value
FROM A
JOIN B
ON B.rowtime BETWEEN A.rowtime - INTERVAL '1' HOUR AND A.rowtime + INTERVAL '1' HOUR;
  • DELAY: Specifies the initial delay (in time units such as ms, s, min or h) before the first early fire. Must be > 0.
  • TIME_MODE (optional): Determines whether early-fire timers use event time or processing time.
    • If omitted, defaults to:
      • ROWTIME if it is an event-time interval join
      • PROCTIME if it is a processing-time interval join
    • If set to ROWTIME, the early fire timers use event time. 
      • The planner throws an exception indicating that TIME_MODE='ROWTIME' is invalid for a processing-time interval join. Sample error message: “TIME_MODE=ROWTIME’ is not supported for processing-time interval joins. Please use ‘TIME_MODE=PROCTIME' or remove this option.
    • If set to PROCTIME, the early fire timers use wall-clock (processing) time. For instance, a user might force a processing-time early fire on an event-time interval join if they want partial results to follow wall-clock time rather than even time/watermarks.

Alignment With Existing Early-Fire Configurations

Flink already provides configurations like table.exec.emit.early-fire.enabled and table.exec.emit.early-fire.delay primarily for window operators. In this FLIP, we introduce a hint-based approach specifically for interval join. To avoid user confusion:

  • Will investigate aligning or unifying these settings with existing early-firing configuration.
  • Will keep the hint-based approach for interval-join-specific logic, but document how it relates to table.exec.emit.* in window operators.

Granularity of the Early Fire Hint

When a single SQL query block contains multiple operators (for example, multiple interval joins or aggregations), the EARLY_FIRE hint might only be intended for a specific operator. This section defines how to scope the hint so that only the desired operator is affected.

Multiple-Joins Example

Consider the following query with two interval joins:

SELECT /*+ EARLY_FIRE('delay'='5s', 'time_mode'='rowtime') */
      o.*,
      s.*,
      d.*
FROM Orders o
LEFT JOIN Shipments s
    ON o.id = s.order_id
    AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
RIGHT JOIN Delivery d
    ON d.delivery_id = s.delivery_id
    AND d.delivery_time BETWEEN s.ship_time AND s.ship_time + INTERVAL '12' HOUR;

Here, there are two joins:

  1. Orders Shipments (interval join)
  2. ShipmentsDelivery (interval join)

Placing the EARLY_FIRE hint once at the top of the query creates ambiguity about which join the hint applies to. By default, the planner could apply the hint to the first interval join (i.e., the Orders ↔ Shipments join) and ignore it for subsequent joins.

Splitting Queries for Fine-Grained Control

If early firing is intended for the second interval join, the user would typically split the query into two stages:

-- Stage 1: Apply EARLY_FIRE to the first join
CREATE VIEW OrdersWithShipments AS
SELECT /*+ EARLY_FIRE('delay'='5s', 'time_mode'='rowtime') */
      o.*,
      s.*
FROM Orders o
LEFT JOIN Shipments s
    ON o.id = s.order_id
    AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time;

-- Stage 2: Perform the second join without EARLY_FIRE (or with a different setting)
SELECT ows.*, d.*
FROM OrdersWithShipments ows
RIGHT JOIN Delivery d
    ON d.delivery_id = ows.delivery_id
    AND d.delivery_time BETWEEN ows.ship_time AND ows.ship_time + INTERVAL '12' HOUR;

This two-stage approach ensures each interval join (or operator) can have its own EARLY_FIRE configuration.

Additional Operators (Joins, Aggregations)

When a query contains multiple join operators, aggregations, or other transformations, it can be unclear which operator is subject to the EARLY_FIRE behavior. For example:

SELECT window_start, window_end, buyer, count(*) /*+ EARLY_FIRE('delay'='5s', 'time_mode'='rowtime') */
FROM Orders o
LEFT JOIN Shipments s ON o.id = s.order_id
-- Another join or transformation could follow
GROUP BY o.window_start, o.window_end, buyer;

Because EARLY_FIRE is currently only implemented for interval joins (as of Flink 1.20, Flink query hints only support join hints), the single hint in this example would default to applying to one join operator. If you have multiple operators that need different early-fire settings, each one must be placed in its own query or view so there is no ambiguity.

-- First apply EARLY_FIRE to the first join in a separate SELECT
CREATE VIEW OrdersWithEarlyFire AS
SELECT /*+ EARLY_FIRE('delay'='5s', 'time_mode'='rowtime') */ *
FROM Orders o
LEFT JOIN Shipments s ON ...

-- Then run a subsequent query (with or without another early-fire hint)
SELECT window_start, window_end, buyer, COUNT(*)
FROM OrdersWithEarlyFire ...
GROUP BY window_start, window_end, buyer;
Summary
  • EARLY_FIRE is scoped to a single operator. If multiple operators in one query can support early firing, Flink applies the hint to the first matching operator and logs a warning to highlight the ambiguity. Subsequent operators will not receive the hint.
  • Split queries for different early-fire settings. When multiple operators require separate early-fire configurations (or if you want to avoid ambiguity), split the SQL into separate statements (or views). This approach ensures each operator can have its own distinct EARLY_FIRE hint.

Planner Changes

Handling Hints:

  • Update StreamPhysicalIntervalJoinRule to parse and propagate early fire hints (RelHint) to the corresponding physical node (StreamPhysicalIntervalJoin).
    • Add support for parsing EARLY_FIRE hints in StreamPhysicalIntervalJoinRule.
    • Attach early fire configurations (delay, interval, time_mode) to the StreamPhysicalIntervalJoin node.

Propagation:

  • Ensure StreamPhysicalIntervalJoin carries the early fire settings into the execution phase via StreamExecIntervalJoin.

Planner Awareness:

  • Outer joins with early firing can switch from append-only to retraction/upsert mode, requiring updates to FlinkChangelogModeInferenceProgram so downstream operators know to handle retractions.

Runtime Changes

StreamExecIntervalJoin Modifications:

  • Modify StreamExecIntervalJoin to support early firing. Introduce two new parameters: earlyFireDelay and earlyFireTimeMode.

State Management:

  • Extend state to store metadata about early-fired events:
    • Track whether an event has already been early-fired (to prevent duplicate emissions).
    • Maintain retraction information for speculative results.

Event Emission Logic:

  • Inner Join:
    • Matches are already emitted immediately upon detection. Early fire logic is unnecessary.
    • No state changes needed.
  • Left Outer Join:
    • Emit (A, NULL) early if no match exists at the time of the early fire.
    • If a late match arrives, retract (A, NULL) and emit (A, B).
  • Right Outer Join:
    • Symmetrical to left join, emitting (NULL, B) early for unmatched B events.
    • Retract (NULL, B) and emit (A, B) if a late match occurs.
  • Full Outer Join:
    • Combines the logic of left and right joins for unmatched rows on both sides.

Code Changes

Key Code Paths to Modify:

  • FlinkChangelogModeInferenceProgram:
    • Check if early firing is enabled using the configuration option.
    • When early firing is enabled:
      • ModifyKindSetTrait of IntervalJoin is set to ALL_CHANGES.
      • UpdateKind can be:
        • ONLY_UPDATE_AFTER, if the downstream sink only needs final updates (e.g., supports retract streams but only expects after-updates).
        • BEFORE_AND_AFTER, if the downstream sink requires both before- and after-updates.
        • Which one you choose depends on the requirements of the target sink. For example, if the sink is known to handle both “before” and “after” updates, we can set BEFORE_AND_AFTER. Otherwise, we can set ONLY_UPDATE_AFTER.
    • When early firing is disabled:
      • ModifyKindSetTrait of IntervalJoin is set to INSERT_ONLY (i.e., no retractions or updates are produced).
      • UpdateKind of IntervalJoin is set to NONE.
    • IntervalJoin Input Requirements (regardless of early firing setting):
      • The inputs to an IntervalJoin should provide INSERT_ONLY modifies with NONE updates. In other words, IntervalJoin is designed such that its upstream operators do not send retractions or updates into the join.
    • This modification ensures that:
      • Downstream operators are aware they might receive retractions / updates
      • The planner can properly handle the changelog mode for early firing interval joins
  • StreamPhysicalIntervalJoinRule:
    • Parse EARLY_FIRE hint and attach configurations to the StreamPhysicalIntervalJoin node.
  • StreamPhysicalIntervalJoin:
    • Propagate earlyFireDelay and earlyFireFrequency to StreamExecIntervalJoin.
  • TimeIntervalJoin:
    • Introduce early fire timers.
      • Add a timer for each unmatched event to schedule the first early fire based on the earlyFireDelay.
      • After the first fire, reschedule periodic firing using the earlyFireInterval.
    • Modify state management to track early-fired events.
    • Implement retraction logic for correcting early-fired results.

Compatibility, Deprecation, and Migration Plan

This feature is backward compatible:

  • Default behavior remains unchanged unless the EARLY_FIRE hint is explicitly used.
  • No impact on existing interval join queries without early fire hints.

Test Plan

Unit Tests:

  • Verify that early fire hints are correctly parsed and propagated through the planner.
  • Validate emission behavior for each join type (inner, left, right, full outer).

Integration Tests:

  • Simulate out-of-order events and verify that early-fired results are retracted/updated as expected.

End-to-End Tests:

  • Use a streaming job with interval joins to verify correctness with watermarks, late events, and speculative emissions.

Rejected Alternatives

  • Hints for Early Fire. Interval (optional): Specifies the interval (in time units) span between subsequent early fires. If not set, the early fire will occur only once, as specified by the DELAY parameter. Must be > 0.
  • Immediate Early Firing Without Correction:
    • This approach would violate consistency guarantees and was rejected.
  • Handle the scenario where a user sets TIME_MODE='ROWTIME' on a processing-time interval join - Ignore and Fallback:
    • When the planner detects TIME_MODE=ROWTIME for a processing-time interval join, it logs a warning and silently sets TIME_MODE='PROCTIME' instead.
    • Pros: Keeps the query running in some consistent manner (still performs early firing, but by processing time).
    • Cons: User may not realize their configuration has been overridden unless they notice the warning.
  • Count-based early trigger. This is an early firing strategy that triggers output as soon as a certain number of elements have arrived in the current interval (i.e., before the interval is closed by the watermark).
  • Granularity of the Early Fire Hint. Warn or error out if it detects more than one eligible operator in the same statement, unless a more specific rule or user configuration exists.