DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
Currently, testing PTFs require full integration tests against a running Flink cluster. This FLIP introduces a test harness for unit testing PTFs and provides access to output, state, timers, and watermarks for assertions.
We want it's API to be designed for PTF developers, make use of Flink's type inference to catch errors during test setup, and to get as close to production-expected behaviour as possible.
@DataTypeHint("ROW<name STRING>")
public static class ThresholdFilterFunction extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row row,
Integer threshold) {
Integer value = row.getFieldAs(1);
if (value >= threshold) {
collect(Row.of(row.getField(0)));
}
}
}
@Test
void testThresholdFunction() throws Exception {
ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(ThresholdFilterFunction.class)
.withScalarArgument("threshold", 50)
.build();
harness.processElement("Alice", 100);
harness.processElement("Bob", 30);
harness.processElement("Charlie", 75);
List<Row> output = harness.getOutput();
assertThat(output).hasSize(2);
assertThat(output.get(0).getField(0)).isEqualTo("Alice");
assertThat(output.get(1).getField(0)).isEqualTo("Charlie");
}
The aim of the harness is to also provide a stable, easy to extend framework in table-test-utils so that as new PTF features are supported, adding them to the harness takes minimal lift. The harness also aims to be lightweight and as dependency free on the heavier parts of Flink as possible.
Public Interfaces
This FLIP introduces the ProcessTableFunctionTestHarness<OUT> class and supporting types.
public class ProcessTableFunctionTestHarness<OUT> implements AutoClosable {
// -------------------------------------------------------------------------
// Builder
// -------------------------------------------------------------------------
/**
* Creates a builder for a test harness for the given PTF class.
*/
public static <OUT> Builder<OUT> ofClass(
Class<? extends ProcessTableFunction<OUT>> functionClass);
/**
* Builder for configuring a {@link ProcessTableFunctionTestHarness}.
* The builder validates the PTF signature and argument configuration at build time.
*/
public static class Builder<OUT> {
// Scalar Arguments
/**
* Configures a scalar argument (non-table parameter) for the PTF.
*
* The argument name must match the name specified in the eval() method's
* parameter annotations or parameter name.
*/
public Builder<OUT> withScalarArgument(String argumentName, Object value);
// Table Arguments
/**
* Declares the underlying data type of the table argument.
* Useful for data validation and for specification of withTime parameters.
*/
public <T> Builder<OUT> withTableArgumentType(String argumentName, AbstractDataType<T> dataType)
/**
* Declares a table argument with a single static row or multiple.
* For testing multi-input PTFs where one table has static/fixture data and
* another receives dynamic data during the test.
*
* When eval() is called for elements from other tables, either a static row will be
* passed as the value for this table argument, or rows are provided according to the
* table semantics (row-by-row for ROW_SEMANTIC, as a set for SET_SEMANTIC)
*/
public Builder<OUT> withTableArgument(String argumentName, Row row);
public Builder<OUT> withTableArgument(String argumentName, Row... rows);
// Partitioning
/**
* Configures PARTITION BY for the PTF
*/
public Builder<OUT> withPartitionBy(String argumentName, int... columnPositions);
public Builder<OUT> withPartitionBy(String argumentName, String... columnNames);
// Time Configuration
/**
* Configures the event time columns for the table inputs.
* The provided column names should be present on the table arguments, with their structure
* declared via withTableArgumentType.
*/
public Builder<OUT> withOnTimeColumn(String... columnNames);
/**
* Sets the initial watermark timestamp for all table inputs.
*/
public Builder<OUT> withInitialWatermark(long watermark);
public Builder<OUT> withInitialWatermark(Instant watermark);
public Builder<OUT> withInitialWatermark(LocalDateTime watermark);
/**
* Sets the initial watermark timestamp for a specific table input.
*/
public Builder<OUT> withInitialWatermarkForTable(String tableArgument, long watermark);
public Builder<OUT> withInitialWatermarkForTable(String tableArgument, Instant watermark);
public Builder<OUT> withInitialWatermarkForTable(String tableArgument, LocalDateTime watermark);
/**
* Sets the initial system clock time to the specified timestamp.
*/
public Builder<OUT> withInitialSystemClock(long timestamp) throws Exception;
public Builder<OUT> withInitialSystemClock(Instant timestamp) throws Exception;
public Builder<OUT> withInitialSystemClock(LocalDateTime timestamp) throws Exception;
// State Initialization
/**
* Sets the initial state for the PTF for a particular state argument and partition key.
*/
public <K, S> Builder<OUT> withInitialStateArgument(String stateArgument, K key, S state);
/**
* Initialises the harness from a previous test state snapshot.
*/
public Builder<OUT> restoreFromSnapshot(TestSnapshot snapshot);
/**
* Builds the harness, performs pre-flight PTF validation and calls the PTF's open() method.
*/
public ProcessTableFunctionTestHarness<OUT> build() throws Exception;
}
// Test State Snapshotting
public TestSnapshot snapshot();
// Processing Elements
/**
* Processes a single row. Only valid for PTFs that have a single table argument.
*/
public void processElement(Row row) throws Exception;
public void processElement(Object... values) throws Exception;
public void processElement(RowKind rowKind, Object... values) throws Exception;
/**
* Process a single row for a particular table argument.
*/
public void processElementForTable(String tableArgument, Row row) throws Exception;
public void processElementForTable(String tableArgument, Object... values) throws Exception;
public void processElementForTable(String tableArgument, RowKind rowKind, Object... values)
throws Exception;
// Time & Watermarks
/**
* Advances the watermark to the specified timestamp for all inputs.
* This may trigger timers that are scheduled at or before this timestamp.
*/
public void advanceWatermark(long timestamp) throws Exception;
public void advanceWatermark(Instant timestamp) throws Exception;
public void advanceWatermark(LocalDateTime timestamp) throws Exception;
/**
* Advances the watermark to the specified timestamp for the specified table input.
* This may trigger timers that are scheduled at or before this timestamp.
*/
public void advanceWatermark(String tableArgument, long timestamp) throws Exception;
public void advanceWatermark(String tableArgument, Instant timestamp) throws Exception;
public void advanceWatermark(String tableArgument, LocalDateTime timestamp) throws Exception;
/**
* Gets the current event time watermark for a particular table argument.
*/
public <TimeType> TimeType getCurrentWatermarkForTable(String tableArgument, Class<TimeType> conversionClass);
/**
* Advances the system clock time to the specified timestamp.
* This may trigger state TTL effects.
*/
public void advanceSystemClock(long timestamp) throws Exception;
public void advanceSystemClock(Instant timestamp) throws Exception;
public void advanceSystemClock(LocalDateTime timestamp) throws Exception;
// Output
/**
* Returns all output collected so far.
*/
public List<OUT> getOutput();
/**
* Clears all collected output.
*/
public void clearOutput();
// State Introspection
/**
* Gets the state for a particular state argument and partition key.
*/
public <S> S getStateForKey(String stateArgument, Row key, Class<S> stateClass) throws Exception;
/**
* Sets the state for a specific state argument and partition key.
*/
public <S> void setStateForKey(String stateArgument, Row key, S state) throws Exception;
/**
* Gets all partition keys that have state for a state argument.
*/
public <K> Set<K> getStateKeys(String stateArgument) throws Exception;
/**
* Gets state for all partitions for a state argument.
*/
public <S> Map<Row, S> getAllState(String stateArgument, Class<S> stateClass)
throws Exception;
/**
* Clears state for the partition key and argument.
*/
public void clearStateForKey(String stateArgument, Row key) throws Exception;
/**
* Clears state for the state argument.
*/
public void clearState(String stateArgument) throws Exception;
/**
* Clears all state for all arguments and partitions.
*/
public void clearAllState() throws Exception;
// Timer Introspection
/**
* Represents a registered timer.
*/
public static class Timer {
/** Returns the timestamp at which this timer will fire. */
public <TimeType> TimeType getTimestamp(Class<TimeType> conversionClass);
/** Returns the timer name. */
@Nullable
public String getName();
/** Returns the partition key this timer belongs to. */
@Nullable
public Object getKey();
/** Returns whether the timer has fired or not. */
public boolean hasFired();
}
/**
* Gets pending timers.
*/
public List<Timer> getPendingTimers() throws Exception;
public List<Timer> getPendingTimers(String timerName) throws Exception;
/**
* Gets timers that have fired.
*/
public List<Timer> getFiredTimers() throws Exception;
public List<Timer> getFiredTimers(String timerName) throws Exception;
/**
* Clears the fired timer history.
*/
public void clearFiredTimers();
}
Proposed Changes
Basic Changes
The harness will support setting up initial conditions of a test, including the PTF class in question and initial arguments, state, etc. On building, the harness will do some pre-flight validation (type inference on the PTF signature, validation of argument names and table arguments, etc). Once built, the the harness can then process records and the result of processing and the internal state of the PTF can be examined and asserted on.
@DataTypeHint("ROW<name STRING>")
public static class ThresholdFilterFunction extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row row,
Integer threshold) {
Integer value = row.getFieldAs("value");
if (value >= threshold) {
collect(Row.of(row.getField("user")));
}
}
}
@Test
void testThresholdFunction() throws Exception {
ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(ThresholdFilterFunction.class)
.withTableArgumentType("row", DataTypes.of("ROW<user STRING, value INT>"))
.withScalarArgument("threshold", 50)
.build();
Row rowA = Row.withNames();
rowA.setField("user", "Alice");
rowA.setField("value", 100);
harness.processElement(rowA);
Row rowB = Row.withNames();
rowB.setField("user", "Bob");
rowB.setField("value", 30);
harness.processElement(rowB);
Row rowC = Row.withNames();
rowC.setField("user", "Charlie");
rowC.setField("value", 75);
harness.processElement(rowC);
List<Row> output = harness.getOutput();
assertThat(output).hasSize(2);
assertThat(output.get(0).getField("user")).isEqualTo("Alice");
assertThat(output.get(1).getField("user")).isEqualTo("Charlie");
}
State and Multi-Input
State introspection is a key feature for testing stateful PTFs. The harness exposes methods to directly inspect and manipulate state, allowing tests to verify state transitions without relying on output messages.
The harness can be initialised with an initial state, or allow the state to be created over the course of the harness test run.
For partitioned PTFs, users can inspect state for specific keys using getStateForKey() , or retrieve all partition state via getAllState() .
Multi-input tables are supported by allowing users to configure static fixture data for some table arguments while processing dynamic data for others, so that users can test scenarios involving a PTF in a join.
public static class GreetingWithMemory extends ProcessTableFunction<String> {
public static class CountState {
public long counter = 0L;
}
public void eval(
@StateHint CountState countState,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
countState.counter++;
collect("Hello " + input.getFieldAs(0) + ", visit #" + countState.counter);
}
}
@Test
void testStatefulFunction() throws Exception {
GreetingWithMemory.CountState initialState = new GreetingWithMemory.CountState();
initialState.counter = 10;
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(GreetingWithMemory.class)
.withTableArgumentType("input", DataTypes.of("ROW<user STRING>"))
.withPartitionBy("input", 0) // Partition by first column (user ID)
.withInitialState("countState", initialState)
.build();
harness.processElement("Alice");
// Verify output
List<String> output = harness.getOutput();
assertThat(output).hasSize(1);
assertThat(output.get(0)).isEqualTo("Hello Alice, visit #11");
// Verify state was updated
GreetingWithMemory.CountState currentState =
harness.getStateForKey("countState", "Alice", GreetingWithMemory.CountState.class);
assertThat(currentState.counter).isEqualTo(11);
}
public static class PerUserCounter extends ProcessTableFunction<String> {
public static class CountState {
public long counter = 0L;
}
public void eval(
@StateHint CountState countState,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
state.counter++;
collect(input.getFieldAs(0) + ": " + state.counter);
}
}
@Test
void testPartitionedState() throws Exception {
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(PerUserCounter.class)
.withTableArgumentType("input", DataTypes.of("ROW<user STRING>"))
.withPartitionBy("input", 0) // Partition by first column (user ID)
.build();
harness.processElement("alice", "action1");
harness.processElement("bob", "action2");
harness.processElement("alice", "action3");
// Check state for specific keys
PerUserCounter.CountState aliceState =
harness.getStateForKey("countState", "alice", PerUserCounter.CountState.class);
assertThat(aliceState.counter).isEqualTo(2);
PerUserCounter.CountState bobState =
harness.getStateForKey("countState", "bob", PerUserCounter.CountState.class);
assertThat(bobState.counter).isEqualTo(1);
// Get all state
Map<String, PerUserCounter.CountState> allState =
harness.getAllState("countState", String.class, PerUserCounter.CountState.class);
assertThat(allState).hasSize(2);
}
public static class SessionTracker extends ProcessTableFunction<String> {
@StateHint(ttl = @StateTtlConfig(ttl = "PT5M")) // 5 minute TTL
public static class SessionState {
public String lastAction;
}
public void eval(
@StateHint SessionState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
if (state.lastAction == null) {
collect("New session for " + input.getFieldAs(0));
} else {
collect("Continuing session for " + input.getFieldAs(0));
}
state.lastAction = input.getFieldAs(1);
}
}
@Test
void testStateTTL() throws Exception {
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(SessionTracker.class)
.withTableArgumentType("input", DataTypes.of("ROW<ts TIMESTAMP(3), user STRING, action STRING>"))
.withPartitionBy("input", "user")
.withOnTimeColumn("ts")
.build();
Row row = Row.withNames();
row.setField("ts", 1000L);
row.setField("user", "alice");
row.setField("action", "login");
harness.processElement(row);
// Verify state is populated
SessionTracker.SessionState state = harness.getStateForKey("state", "alice", SessionTracker.SessionState.class);
assertThat(state).isNotNull();
assertThat(state.lastAction).isEqualTo("login");
// Advance system clock beyond TTL
harness.advanceSystemClock(Duration.ofMinutes(6).toMillis());
// State should be cleared due to TTL
SessionTracker.SessionState stateAfterTTL =
harness.getStateForKey("state", "alice", SessionTracker.SessionState.class);
assertThat(stateAfterTTL).isNull();
}
public static class GreetingWithLastPurchase extends ProcessTableFunction<String> {
public static class LastItemState {
public String lastItem;
}
public void eval(
@StateHint LastItemState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row visit,
@ArgumentHint(SET_SEMANTIC_TABLE) Row purchase) {
// Process row from Purchases table
if (purchase != null) {
state.lastItem = purchase.getFieldAs(0);
}
// Process row from Visits table
else if (visit != null) {
if (state.lastItem == null) {
collect("Hello " + visit.getFieldAs(0) + ", let me know if I can help!");
} else {
collect("Hello " + visit.getFieldAs(0) +
", here to buy " + state.lastItem + " again?");
}
}
}
}
@Test
void testMultiInputWithStaticData() throws Exception {
// Set up harness with static data for the purchase table
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(GreetingWithLastPurchase.class)
.withTableArgumentType("visit", DataTypes.of("ROW<user STRING>"))
.withTableArgumentType("purchase", DataTypes.of("ROW<item STRING>"))
.withPartitionBy("visit", 0)
.withTableArgument("purchase", Row.of("a brand new stream processor"))
.build();
// When processing a visit, the purchase argument will have the static value
harness.processElementForTable("visit", Row.of("Alice"));
assertThat(harness.getOutput()).hasSize(1);
assertThat(harness.getOutput().get(0))
.contains("a brand new stream processor");
// Process a dynamic purchase to update state
harness.processElementForTable("purchase", Row.of("a squirrel"));
// Process another visit - state should reflect the new purchase
harness.processElementForTable("visit", Row.of("Bob"));
assertThat(harness.getOutput()).hasSize(2);
assertThat(harness.getOutput().get(1)).contains("a squirrel");
}
@Test
void testMultiInputDynamicOnly() throws Exception {
// Both tables receive dynamic data - no need to declare them
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(GreetingWithLastPurchase.class)
.withTableArgumentType("visit", DataTypes.of("ROW<user STRING>"))
.withTableArgumentType("purchase", DataTypes.of("ROW<item STRING>"))
.withPartitionBy("visit", 0)
.build();
// Process visit first - no purchases yet (purchase argument will be null)
harness.processElementForTable("visit", Row.of("Alice"));
assertThat(harness.getOutput().get(0)).contains("let me know if I can help");
// Process a purchase
harness.processElementForTable("purchase", "headphones");
// Process another visit - should remember the purchase
harness.processElementForTable("visit", Row.of("Bob"));
assertThat(harness.getOutput().get(1)).contains("headphones");
}
Time & Timers
The harness will provide explicit control over event time progression through advanceEventTime(), which advances the watermark and triggers any timers scheduled at or before that timestamp.
Timer introspection is exposed to the user, allowing tests to verify that timers are registered correctly and fire at the expected times. Each timer tracks its timestamp, optional name, and partition key, and whether or not it has fired.
public static class GreetingWithFollowUp extends ProcessTableFunction<String> {
public static class StayState {
public String name;
public long counter = 0L;
}
public void eval(
Context ctx,
@StateHint StayState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
state.name = input.getFieldAs("user");
state.counter++;
collect("Hello " + state.name + ", visit #" + state.counter);
// Register a follow-up timer for 1 minute later
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
timeCtx.registerOnTime(timeCtx.time().plus(Duration.ofMinutes(1)));
}
public void onTimer(@StateHint StayState state) {
collect("Hello " + state.name + ", I hope you enjoyed your stay! " +
"Please let us know if there's anything we could have done better.");
}
}
@Test
void testEventTimeTimer() throws Exception {
ProcessTableFunctionTestHarness<String> harness =
ProcessTableFunctionTestHarness.ofClass(GreetingWithFollowUp.class)
.withTableArgumentType("input", DataTypes.of("ROW<ts TIMESTAMP(3), user STRING>"))
.withEventTimeColumn("input", "ts")
.build();
Row row = Row.withNames();
row.setField("ts", 1000L);
row.setField("user", "Alice");
harness.processElement(row);
// One output from eval()
assertThat(harness.getOutput()).hasSize(1);
assertThat(harness.getOutput().get(0)).contains("Hello Alice");
// One pending timer
List<ProcessTableFunctionTestHarness.Timer> pending = harness.getPendingTimers();
assertThat(pending).hasSize(1);
// Advance watermark to trigger the timer
harness.advanceWatermark(1000L + Duration.ofMinutes(1).toMillis() + 1);
// Timer should have fired
List<ProcessTableFunctionTestHarness.Timer> fired = harness.getFiredTimers();
assertThat(fired).hasSize(1);
assertThat(harness.getPendingTimers()).isEmpty();
// Should have additional output from onTimer()
assertThat(harness.getOutput()).hasSize(2);
assertThat(harness.getOutput().get(1)).contains("I hope you enjoyed your stay");
}
Test Snapshotting and Recreation
To allow for reusable testing scenarios, the harness also supports the snapshotting of a test's state and it's restoration. The TestSnapshot will contain reference to the PTF class, per-partition state data, timers (both pending and fired), watermarks per table, system clock, output rows, and harness metadata like scalar arguments, table arguments, partition columns, etc.
@DataTypeHint("ROW<name STRING>")
public static class ThresholdFilterFunction extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row row,
Integer threshold) {
Integer value = row.getFieldAs(1);
if (value >= threshold) {
collect(Row.of(row.getField(0)));
}
}
}
@Test
void testThresholdFunction() throws Exception {
ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(ThresholdFilterFunction.class)
.withTableArgumentType("input", DataTypes.of("ROW<user STRING, threshold INT>"))
.withScalarArgument("threshold", 50)
.build();
harness.processElement("Alice", 100);
harness.processElement("Bob", 30);
List<Row> output = harness.getOutput();
assertThat(output).hasSize(1);
ProcessTableFunctionTestHarness.TestSnapshot snapshot = harness.snapshot();
harness.processElement("Charlie", 75);
output = harness.getOutput();
assertThat(output).hasSize(2);
harness = ProcessTableFunctionTestHarness.restoreFromSnapshot(snapshot);
output = harness.getOutput();
assertThat(output).hasSize(1);
}
Out Of Scope (or Stretch Goals)
Handling parallelism akin to how the existing test harnesses do will be out of scope for this FLIP, but could be revisited if it proves crucial for certain test cases.
Compatibility, Deprecation, and Migration Plan
As this is a new API, there are no compatibility concerns. The API will be marked as @Public (or @PublicEvolving ).
Test Plan
The test plan to ensure that the harness covers all the behaviour users would want to test during their PTF development will be to use the harness to cover the PTF test cases described in ProcessTableFunctionTestPrograms .
Rejected Alternatives
Extending existing internal test harnesses
The existing external test harnesses, like OneInputStreamOperatorTestHarness , was rejected as these are more geared towards other UDF use cases, and extending them to support PTFs might be too much of a disruptive lift. Also, they are more internal focused, and the aim of this harness is to be more user/PTF developer focused.