Discussion threadhttps://lists.apache.org/thread/j98dc6oylo9rl7wnz2fk7vrx3szfczx9
Vote threadhttps://lists.apache.org/thread/9bnh056msp32nkjy2smpfzwk0yxbb5xr
JIRA
https://issues.apache.org/jira/browse/FLINK-39254
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The ProcessTableFunction (PTF) feature has been well received by the Flink community and its adoption is increasing. Since FLIP-440 introduced a lot of new API and new concepts, some design decisions need smaller adjustments along late data handling and lazy state access.

Also, talking to community members at Current and Flink Forward conferences has shown that broadcast state is crucial to bridge the gap to DataStream API applications for broadcast joining and rule-based logic.

This FLIP proposes 3 PTF improvements:

1) Don’t drop late data in ProcessFunction as data-loss is usually not intended; similar to DataStream API’s ProcessFunction

2) Introduce ValueView to enable a “supplier”-pattern for state access; similar to MapView and ListView

3) Introduce BROADCAST_SEMANTIC_TABLE as a new kind of argument to PTFs

Public Interfaces

For 1) Late data handling

-- Support of the ORDER BY clause in PTFs
SELECT * FROM MyPtf(input => TABLE t PARTITION BY k ORDER BY ts)

For 2) ValueView

ValueView.value(): T
ValueView.update(T value)
ValueView.isEmpty(): boolean
ValueView.clear()

For 3) broadcast support

ArgumentTrait.BROADCAST_SEMANTIC_TABLE
ArgumentTrait.NOTIFY_STATEFUL_SETS

StaticArgumentTrait.BROADCAST_SEMANTIC_TABLE
StaticArgumentTrait.NOTIFY_STATEFUL_SETS

StateHint.value(): StateKind
StateKind.PER_SET
StateKind.BROADCAST

StateTypeStrategy.isBroadcast(CallContext callContext): boolean

Proposed Changes

Late data handling

Currently, the PTF operator silently drops late data. While this behavior was specified in FLIP-440, it has proven to be a design flaw. We propose updating this behavior to allow late data to enter the PTF.

Silently dropping records contradicts standard SQL semantics, which are typically agnostic to event-time lateness. Furthermore, relying on strict watermarking is often impractical, as watermark strategies are frequently dictated by upstream teams or are too complex for end-users to manage.

Therefore, the PTF implementer should have the autonomy to decide how to handle late data. Some might drop, some handle it as a special case, or reroute to a late data side output. In most of the cases, however, users most likely don’t care about the data being late as they don’t consider event-time semantics in their implementation.

Since PTFs are a relatively new feature and are marked as @PublicEvolving, we suggest to perform the changes “as is“. DataStream API’s ProcessFunction implements similar behavior, so the current PTF implementation could be considered “a bug“. It is likely that not many users have adopted PTFs yet, and even less users have observed the late data handling behavior.

ORDER BY

As a backwards compatible path, this FLIP also proposes to finally support the ORDER BY clause for time attributes in PTF table arguments. By defining a SQL like:

SELECT * FROM MyPtf(input => TABLE t PARTITION BY k ORDER BY ts)

A temporal sort operation is inserted that leads to similar behavior as for

MATCH_RECOGNIZE(PARTITION BY userid ORDER BY ts)

in StreamExecMatch which also drops late data.

The ORDER BY supports sorting on multiple columns (e.g. ORDER BY ts ASC, c2 ASC, c3 DESC), however, the first column must be a watermarked time attribute in ascending order. In case of multiple table arguments, the ORDER BY is executed per input table (as a sorting operator before the executing PTF operator).

Example

The following example illustrates the behavior of a PTF when handling late data:

-- Given:
CREATE TABLE t (ts TIMESTAMP(3), name STRING, score INT, WATERMARK FOR ts AS ts)

-- Input Data:
13:00, Bob, 12
13:00, Bob, 33
14:00, Bob, 50
12:00, Bob, 100

-- Function call:
SELECT * FROM IdentityPtf(input => TABLE t PARTITION BY name, on_time => ts)

-- Current Implementation Output (simplified):
-- Data is dropped
13:00, Bob, 12
14:00, Bob, 50

-- Proposed Output (simplified):
-- Just forward as you would expect it from an identity function
13:00, Bob, 12
13:00, Bob, 33
14:00, Bob, 50
12:00, Bob, 100

-- Function call:
SELECT * FROM IdentityPtf(input => TABLE t PARTITION BY name ORDER BY ts, on_time => ts)

-- Proposed Output (simplified):
-- People expect "best-effort sorting" on a stream of data,
-- thus missing data is more acceptable
13:00, Bob, 12
14:00, Bob, 50

Improved state access via ValueView

The API proposed in FLIP-440 makes it very easy and intuitive for developers to declare and use value state, including defining a default initial value.

Example of current implementation:

public class GreetingWithMemory extends ProcessTableFunction<String> {
  public static class CountState {
    public long counter = 0L;
  }
  public void eval(@StateHint CountState state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
    state.counter++;
    collect("Hello " + input.getFieldAs("name") + ", your " + state.counter + " time?");
  }
}

For aggregate functions that need to access the accumulator for every row, using a POJO for value state is usually sufficient.

However, state access in PTF is more complex and not every state entry needs to be read and/or updated. For future optimizations such as lazy deserialization or avoiding no-op serialization, a “supplier-like“ pattern should be promoted for optimal performance.

This FLIP proposes the introduction of a ValueView that follows the design principles of MapView and ListView and adopts ValueState interfaces from DataStream API.

/**
 * A {@link DataView} that provides {@link Supplier}-like functionality in state entries.
 *
 * <p>A {@link ValueView} can be backed by a Java heap object or can leverage Flink's state
 * backends depending on the context. In many unbounded data scenarios, the {@link ValueView}
 * delegates all calls to a {@link ValueState} instead.
 *
 * @param <T> Type fo the value
 */
@PublicEvolving
public class ValueView<T> implements DataView {

    private T value;

    /**
     * Returns the current value for this view. Returns {@code null} if no value has been set.
     *
     * @return The state value corresponding to the current input.
     */
    public T value() {
        return value;
    }

    /**
     * Updates the view to the given value.
     *
     * @param value The new value for the state, or null for clearing the value.
     */
    void update(T value) {
        this.value = value;
    }

    boolean isEmpty() {
        return value == null;
    }

    @Override
    public void clear() {
        value = null;
    }
}

We will also support this class in aggregating functions for consistency.

In a PTF the example above can be rewritten to:

public class GreetingWithMemory extends ProcessTableFunction<String> {
  public void eval(@StateHint ValueView<Long> state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
    if (input.getFieldAs("condition").equals("true")) {
      // State is not read or deserialized if not required
      return;
    }
    Long newVal = 1;
    if (!state.isEmpty()) {
      newVal = state.get() + 1;
    }
    // State is only written out and serialized when instructed to do so
    state.update(newVal);
    
    collect("Hello " + input.getFieldAs("name") + ", your " + newVal + " time?");
  }
}

ValueView vs. POJO: When to use what?

As a general rule of thumb, POJOs can be used for simple access cases when the PTF always reads and writes state in every invocation. ValueView should be preferred when state access is conditional.

Improved state access via Broadcast State

The Broadcast State Pattern has been requested multiple times to make PTFs fully production ready. Users would like to control the behavior of a PTF during runtime via kind of a control or rule input.

This FLIP proposes to adopt the concept of broadcast state by integrating it into the existing signature and semantics. Note that this is clearly an extension to the SQL standard and gives PTFs more Flink-specific flavor.

Example for BROADCAST_SEMANTIC_TABLE with BROADCAST state

The following example illustrates the usage of a broadcast table with broadcast state.

/**
 * A PTF that filters data based on a dynamic allowlist.
 */
public class RulePtf extends ProcessTableFunction<String> {

    void eval(
            @StateHint(StateKind.BROADCAST) MapView<String, Double> badWords,
            @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row data,
            @ArgumentHint(ArgumentTrait.BROADCAST_SEMANTIC_TABLE) Row rules)
            throws Exception {
        // Process broadcast input
        if (rules != null) {
            // Write access to broadcast state
            badWords.put(rules.getFieldAs("word"), rules.getFieldAs("score"));
            return;
        }
        // Process regular input
        if (data != null) {
            String sentence = data.getFieldAs("sentence");
            String[] words = sentence.toLowerCase(Locale.ROOT).split("\\s+");
            for (String w : words) {
                // Read access to broadcast state
                if (badWords.contains(w)) {
                    return;
                }
            }
            collect(sentence);
        }
    }
}

API Proposal

This includes the following API extensions:

New table argument kind: ArgumentTrait.BROADCAST_SEMANTIC_TABLE

public enum ArgumentTrait {
    /**
     * An argument that accepts a table with broadcast semantics. This trait only applies to {@link
     * ProcessTableFunction} (PTF).
     *
     * <p>A broadcast table argument serves as a side input to PTFs with {@link #ROW_SEMANTIC_TABLE}
     * or {@link #SET_SEMANTIC_TABLE} table arguments. An incoming row for this table is sent to
     * each PTF executing operator, independent of the key context defined by the PARTITION BY
     * clause. If necessary, the PTF operator is able to store the broadcast information in state
     * entries of kind {@link StateKind#BROADCAST}. When processing rows of the main table(s), the
     * broadcast state entry is available for read access.
     *
     * <p>By default, a change to a broadcast state entry has no effect on previously processed
     * stateful sets of a {@link #SET_SEMANTIC_TABLE}. Use {@link #NOTIFY_STATEFUL_SETS} to
     * re-evaluate all stateful sets based on the broadcast information.
     */
    BROADCAST_SEMANTIC_TABLE(true, StaticArgumentTrait.BROADCAST_SEMANTIC_TABLE),
}

Notes:

  • Coexists next to ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE

  • At most one table argument can be declared as “broadcast input side“

  • At least one ROW or SET table argument needs to be present in the signature next to the “regular input side“

  • Processing of data that comes in via BROADCAST_SEMANTIC_TABLE will be limited to only broadcast state operations. Thus, collect() or setting timers are not allowed.

  • Traits such as SUPPORT_UPDATES, REQUIRE_UPDATE_BEFORE, REQUIRE_FULL_DELETE, REQUIRE_ON_TIME equally apply to broadcast table arguments. In this sense, a broadcast table argument has similar behavior as a ROW_SEMANTIC_TABLE.

  • Note: the API has been designed in a way that will also let us run PTFs with this configuration in batch mode in the future

Introduction of StateKind

public @interface StateHint {

    /**
     * The kind of the state entry.
     *
     * <p>By default, the state entry is scoped to the set created by the PARTITION BY clause. It
     * uses Flink's keyed state.
     */
    StateKind value() default StateKind.PER_SET;
}

/** Kind of a {@link ProcessTableFunction}s state entry. */
public enum StateKind {

    /**
     * Defines a state entry scoped to the set created by the PARTITION BY clause. It uses Flink's
     * keyed state.
     *
     * <p>During processing, a PTF instance can access this state for all rows sharing the same key.
     * Both for reading and writing. However, the state cannot be accessed by other keys.
     */
    PER_SET,

    /**
     * Defines a state entry broadcast to all sets, independent of the PARTITION BY clause. It uses
     * Flink's operator state.
     *
     * <p>During processing, only a broadcasting table can write to this state. However, all PTF
     * instances can read it, regardless of their specific key context.
     */
    BROADCAST
}

Notes:

  • The StateHint.value() annotation is currently unused and can serve as a kind flag.

  • All existing @StateHint annotations will be implicitly declared as StateKind.PER_SET

  • The StateKind.BROADCAST kind will declare Flink’s broadcast state

New argument trait: ArgumentTrait.NOTIFY_STATEFUL_SETS

Esp. when broadcasting rules and building a rule engine with a PTF, the question arises how to re-evaluate previously processed keys with the broadcast information.

In DataStream API, the applyToKeyedState API enables forwarding broadcasts to keyed state. Similar behavior is offered via the NOTIFY_STATEFUL_SETS trait.

public enum ArgumentTrait {
    /**
     * Notifies all existing stateful sets to re-evaluate based on the given broadcast row and the
     * potentially new state in state entries of kind {@link StateKind#BROADCAST}.
     *
     * <p>Note: This trait is only valid for {@link #BROADCAST_SEMANTIC_TABLE} arguments.
     */
    NOTIFY_STATEFUL_SETS(false, StaticArgumentTrait.NOTIFY_STATEFUL_SETS),
}

Notes:

  • applyToKeyedState is expensive and could potentially slow down processing and checkpointing.

  • Therefore, NOTIFY_STATEFUL_SETS should be used with care and needs to be declared explicitly. This is a power-user feature.

  • By setting NOTIFY_STATEFUL_SETS, the eval() is called multiple times:

    • First, eval() is called when broadcast state is used to just update the broadcast state.

    • Then for each key, eval() is called with a set key context and broadcast table input.

Example for NOTIFY_STATEFUL_SETS

    /** A PTF that dedups data until broadcast cache invalidation. */
    public static class ControllableDedupPtf extends ProcessTableFunction<String> {

        void eval(
                @StateHint Tuple1<String> cache,
                @ArgumentHint(SET_SEMANTIC_TABLE) Row data,
                @ArgumentHint({BROADCAST_SEMANTIC_TABLE, NOTIFY_STATEFUL_SETS}) Row resetBroadcast)
                throws Exception {
            // Process regular data
            if (data != null) {
                // If cache is empty, remember it and emit
                if (cache.f0 == null) {
                    cache.f0 = data.getFieldAs("payload");
                    collect(cache.f0);
                }
                // Don't emit duplicate
                return;
            }
            // Notification of stateful sets
            else if (resetBroadcast != null && cache != null) {
                // Clean the cache for this key
                cache.f0 = null;
            }
        }
    }

Compatibility, Deprecation, and Migration Plan

New features are fully compatible with the existing interfaces.

For the updated late data handling, ORDER BY serves a migration option.

Test Plan

We add RestoreTests and SemanticTests for battle testing the behavior.

Rejected Alternatives

Enable late data by argument trait

Pro:

  • No compatibility issues

Con:

  • Argument traits and the feature matrix is large enough, we should not introduce another trait just for the late data case

  • Why should a PTF with and without on_time receive different results? The on_time should not have impact on loosing data.