DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The ProcessTableFunction (PTF) feature has been well received by the Flink community and its adoption is increasing. Since FLIP-440 introduced a lot of new API and new concepts, some design decisions need smaller adjustments along late data handling and lazy state access.
Also, talking to community members at Current and Flink Forward conferences has shown that broadcast state is crucial to bridge the gap to DataStream API applications for broadcast joining and rule-based logic.
This FLIP proposes 3 PTF improvements:
1) Don’t drop late data in ProcessFunction as data-loss is usually not intended; similar to DataStream API’s ProcessFunction
2) Introduce ValueView to enable a “supplier”-pattern for state access; similar to MapView and ListView
3) Introduce BROADCAST_SEMANTIC_TABLE as a new kind of argument to PTFs
Public Interfaces
For 1) Late data handling
-- Support of the ORDER BY clause in PTFs SELECT * FROM MyPtf(input => TABLE t PARTITION BY k ORDER BY ts)
For 2) ValueView
ValueView.value(): T ValueView.update(T value) ValueView.isEmpty(): boolean ValueView.clear()
For 3) broadcast support
ArgumentTrait.BROADCAST_SEMANTIC_TABLE ArgumentTrait.NOTIFY_STATEFUL_SETS StaticArgumentTrait.BROADCAST_SEMANTIC_TABLE StaticArgumentTrait.NOTIFY_STATEFUL_SETS StateHint.value(): StateKind StateKind.PER_SET StateKind.BROADCAST StateTypeStrategy.isBroadcast(CallContext callContext): boolean
Proposed Changes
Late data handling
Currently, the PTF operator silently drops late data. While this behavior was specified in FLIP-440, it has proven to be a design flaw. We propose updating this behavior to allow late data to enter the PTF.
Silently dropping records contradicts standard SQL semantics, which are typically agnostic to event-time lateness. Furthermore, relying on strict watermarking is often impractical, as watermark strategies are frequently dictated by upstream teams or are too complex for end-users to manage.
Therefore, the PTF implementer should have the autonomy to decide how to handle late data. Some might drop, some handle it as a special case, or reroute to a late data side output. In most of the cases, however, users most likely don’t care about the data being late as they don’t consider event-time semantics in their implementation.
Since PTFs are a relatively new feature and are marked as @PublicEvolving, we suggest to perform the changes “as is“. DataStream API’s ProcessFunction implements similar behavior, so the current PTF implementation could be considered “a bug“. It is likely that not many users have adopted PTFs yet, and even less users have observed the late data handling behavior.
ORDER BY
As a backwards compatible path, this FLIP also proposes to finally support the ORDER BY clause for time attributes in PTF table arguments. By defining a SQL like:
SELECT * FROM MyPtf(input => TABLE t PARTITION BY k ORDER BY ts)
A temporal sort operation is inserted that leads to similar behavior as for
MATCH_RECOGNIZE(PARTITION BY userid ORDER BY ts)
in StreamExecMatch which also drops late data.
The ORDER BY supports sorting on multiple columns (e.g. ORDER BY ts ASC, c2 ASC, c3 DESC), however, the first column must be a watermarked time attribute in ascending order. In case of multiple table arguments, the ORDER BY is executed per input table (as a sorting operator before the executing PTF operator).
Example
The following example illustrates the behavior of a PTF when handling late data:
-- Given: CREATE TABLE t (ts TIMESTAMP(3), name STRING, score INT, WATERMARK FOR ts AS ts) -- Input Data: 13:00, Bob, 12 13:00, Bob, 33 14:00, Bob, 50 12:00, Bob, 100 -- Function call: SELECT * FROM IdentityPtf(input => TABLE t PARTITION BY name, on_time => ts) -- Current Implementation Output (simplified): -- Data is dropped 13:00, Bob, 12 14:00, Bob, 50 -- Proposed Output (simplified): -- Just forward as you would expect it from an identity function 13:00, Bob, 12 13:00, Bob, 33 14:00, Bob, 50 12:00, Bob, 100 -- Function call: SELECT * FROM IdentityPtf(input => TABLE t PARTITION BY name ORDER BY ts, on_time => ts) -- Proposed Output (simplified): -- People expect "best-effort sorting" on a stream of data, -- thus missing data is more acceptable 13:00, Bob, 12 14:00, Bob, 50
Improved state access via ValueView
The API proposed in FLIP-440 makes it very easy and intuitive for developers to declare and use value state, including defining a default initial value.
Example of current implementation:
public class GreetingWithMemory extends ProcessTableFunction<String> {
public static class CountState {
public long counter = 0L;
}
public void eval(@StateHint CountState state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
state.counter++;
collect("Hello " + input.getFieldAs("name") + ", your " + state.counter + " time?");
}
}
For aggregate functions that need to access the accumulator for every row, using a POJO for value state is usually sufficient.
However, state access in PTF is more complex and not every state entry needs to be read and/or updated. For future optimizations such as lazy deserialization or avoiding no-op serialization, a “supplier-like“ pattern should be promoted for optimal performance.
This FLIP proposes the introduction of a ValueView that follows the design principles of MapView and ListView and adopts ValueState interfaces from DataStream API.
/**
* A {@link DataView} that provides {@link Supplier}-like functionality in state entries.
*
* <p>A {@link ValueView} can be backed by a Java heap object or can leverage Flink's state
* backends depending on the context. In many unbounded data scenarios, the {@link ValueView}
* delegates all calls to a {@link ValueState} instead.
*
* @param <T> Type fo the value
*/
@PublicEvolving
public class ValueView<T> implements DataView {
private T value;
/**
* Returns the current value for this view. Returns {@code null} if no value has been set.
*
* @return The state value corresponding to the current input.
*/
public T value() {
return value;
}
/**
* Updates the view to the given value.
*
* @param value The new value for the state, or null for clearing the value.
*/
void update(T value) {
this.value = value;
}
boolean isEmpty() {
return value == null;
}
@Override
public void clear() {
value = null;
}
}
We will also support this class in aggregating functions for consistency.
In a PTF the example above can be rewritten to:
public class GreetingWithMemory extends ProcessTableFunction<String> {
public void eval(@StateHint ValueView<Long> state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
if (input.getFieldAs("condition").equals("true")) {
// State is not read or deserialized if not required
return;
}
Long newVal = 1;
if (!state.isEmpty()) {
newVal = state.get() + 1;
}
// State is only written out and serialized when instructed to do so
state.update(newVal);
collect("Hello " + input.getFieldAs("name") + ", your " + newVal + " time?");
}
}
ValueView vs. POJO: When to use what?
As a general rule of thumb, POJOs can be used for simple access cases when the PTF always reads and writes state in every invocation. ValueView should be preferred when state access is conditional.
Improved state access via Broadcast State
The Broadcast State Pattern has been requested multiple times to make PTFs fully production ready. Users would like to control the behavior of a PTF during runtime via kind of a control or rule input.
This FLIP proposes to adopt the concept of broadcast state by integrating it into the existing signature and semantics. Note that this is clearly an extension to the SQL standard and gives PTFs more Flink-specific flavor.
Example for BROADCAST_SEMANTIC_TABLE with BROADCAST state
The following example illustrates the usage of a broadcast table with broadcast state.
/**
* A PTF that filters data based on a dynamic allowlist.
*/
public class RulePtf extends ProcessTableFunction<String> {
void eval(
@StateHint(StateKind.BROADCAST) MapView<String, Double> badWords,
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row data,
@ArgumentHint(ArgumentTrait.BROADCAST_SEMANTIC_TABLE) Row rules)
throws Exception {
// Process broadcast input
if (rules != null) {
// Write access to broadcast state
badWords.put(rules.getFieldAs("word"), rules.getFieldAs("score"));
return;
}
// Process regular input
if (data != null) {
String sentence = data.getFieldAs("sentence");
String[] words = sentence.toLowerCase(Locale.ROOT).split("\\s+");
for (String w : words) {
// Read access to broadcast state
if (badWords.contains(w)) {
return;
}
}
collect(sentence);
}
}
}
API Proposal
This includes the following API extensions:
New table argument kind: ArgumentTrait.BROADCAST_SEMANTIC_TABLE
public enum ArgumentTrait {
/**
* An argument that accepts a table with broadcast semantics. This trait only applies to {@link
* ProcessTableFunction} (PTF).
*
* <p>A broadcast table argument serves as a side input to PTFs with {@link #ROW_SEMANTIC_TABLE}
* or {@link #SET_SEMANTIC_TABLE} table arguments. An incoming row for this table is sent to
* each PTF executing operator, independent of the key context defined by the PARTITION BY
* clause. If necessary, the PTF operator is able to store the broadcast information in state
* entries of kind {@link StateKind#BROADCAST}. When processing rows of the main table(s), the
* broadcast state entry is available for read access.
*
* <p>By default, a change to a broadcast state entry has no effect on previously processed
* stateful sets of a {@link #SET_SEMANTIC_TABLE}. Use {@link #NOTIFY_STATEFUL_SETS} to
* re-evaluate all stateful sets based on the broadcast information.
*/
BROADCAST_SEMANTIC_TABLE(true, StaticArgumentTrait.BROADCAST_SEMANTIC_TABLE),
}
Notes:
Coexists next to
ROW_SEMANTIC_TABLEandSET_SEMANTIC_TABLEAt most one table argument can be declared as “broadcast input side“
At least one ROW or SET table argument needs to be present in the signature next to the “regular input side“
Processing of data that comes in via
BROADCAST_SEMANTIC_TABLEwill be limited to only broadcast state operations. Thus,collect()or setting timers are not allowed.Traits such as
SUPPORT_UPDATES,REQUIRE_UPDATE_BEFORE,REQUIRE_FULL_DELETE,REQUIRE_ON_TIMEequally apply to broadcast table arguments. In this sense, a broadcast table argument has similar behavior as aROW_SEMANTIC_TABLE.Note: the API has been designed in a way that will also let us run PTFs with this configuration in batch mode in the future
Introduction of StateKind
public @interface StateHint {
/**
* The kind of the state entry.
*
* <p>By default, the state entry is scoped to the set created by the PARTITION BY clause. It
* uses Flink's keyed state.
*/
StateKind value() default StateKind.PER_SET;
}
/** Kind of a {@link ProcessTableFunction}s state entry. */
public enum StateKind {
/**
* Defines a state entry scoped to the set created by the PARTITION BY clause. It uses Flink's
* keyed state.
*
* <p>During processing, a PTF instance can access this state for all rows sharing the same key.
* Both for reading and writing. However, the state cannot be accessed by other keys.
*/
PER_SET,
/**
* Defines a state entry broadcast to all sets, independent of the PARTITION BY clause. It uses
* Flink's operator state.
*
* <p>During processing, only a broadcasting table can write to this state. However, all PTF
* instances can read it, regardless of their specific key context.
*/
BROADCAST
}
Notes:
The StateHint.value() annotation is currently unused and can serve as a kind flag.
All existing
@StateHintannotations will be implicitly declared asStateKind.PER_SETThe
StateKind.BROADCASTkind will declare Flink’s broadcast state
New argument trait: ArgumentTrait.NOTIFY_STATEFUL_SETS
Esp. when broadcasting rules and building a rule engine with a PTF, the question arises how to re-evaluate previously processed keys with the broadcast information.
In DataStream API, the applyToKeyedState API enables forwarding broadcasts to keyed state. Similar behavior is offered via the NOTIFY_STATEFUL_SETS trait.
public enum ArgumentTrait {
/**
* Notifies all existing stateful sets to re-evaluate based on the given broadcast row and the
* potentially new state in state entries of kind {@link StateKind#BROADCAST}.
*
* <p>Note: This trait is only valid for {@link #BROADCAST_SEMANTIC_TABLE} arguments.
*/
NOTIFY_STATEFUL_SETS(false, StaticArgumentTrait.NOTIFY_STATEFUL_SETS),
}
Notes:
applyToKeyedStateis expensive and could potentially slow down processing and checkpointing.Therefore,
NOTIFY_STATEFUL_SETSshould be used with care and needs to be declared explicitly. This is a power-user feature.By setting
NOTIFY_STATEFUL_SETS, theeval()is called multiple times:First,
eval()is called when broadcast state is used to just update the broadcast state.Then for each key,
eval()is called with a set key context and broadcast table input.
Example for NOTIFY_STATEFUL_SETS
/** A PTF that dedups data until broadcast cache invalidation. */
public static class ControllableDedupPtf extends ProcessTableFunction<String> {
void eval(
@StateHint Tuple1<String> cache,
@ArgumentHint(SET_SEMANTIC_TABLE) Row data,
@ArgumentHint({BROADCAST_SEMANTIC_TABLE, NOTIFY_STATEFUL_SETS}) Row resetBroadcast)
throws Exception {
// Process regular data
if (data != null) {
// If cache is empty, remember it and emit
if (cache.f0 == null) {
cache.f0 = data.getFieldAs("payload");
collect(cache.f0);
}
// Don't emit duplicate
return;
}
// Notification of stateful sets
else if (resetBroadcast != null && cache != null) {
// Clean the cache for this key
cache.f0 = null;
}
}
}
Compatibility, Deprecation, and Migration Plan
New features are fully compatible with the existing interfaces.
For the updated late data handling, ORDER BY serves a migration option.
Test Plan
We add RestoreTests and SemanticTests for battle testing the behavior.
Rejected Alternatives
Enable late data by argument trait
Pro:
No compatibility issues
Con:
Argument traits and the feature matrix is large enough, we should not introduce another trait just for the late data case
Why should a PTF with and without on_time receive different results? The on_time should not have impact on loosing data.