DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In DataStream API V2, users define data processing logic using ProcessFunction. The framework can optimize jobs based on the characteristics of this logic. Here are three practical examples:
Scenario 1: The user uses a ProcessFunction for batch computations, like sorting received data. In this case, the ProcessFunction only outputs the sorted data after all input is received. If the framework is aware of this, it can schedule downstream operators after the ProcessFunction completes, reducing resource consumption. Ignoring this information could lead to untimely scheduling of downstream operators with no data to process. FLIP-331 addresses and optimizes this scenario.
Scenario 2: The user uses a ProcessFunction for stream processing but is only interested in the last result of the corresponding key at the current time. Knowing this, the framework can adjust the frequency of ProcessFunction outputs, reducing shuffle data volume and downstream operator workload. If ignored, each input would generate a new output. FLIP-365 addresses and optimizes this scenario.
Scenario 3: The user's ProcessFunction doesn't cache input or output data. Recognizing this, the framework can enable object reuse for this data within the OperatorChain, enhancing performance. Otherwise, data would be copied before being passed to the next operator. FLIP-329 addresses and optimizes this scenario.
To establish a unified mechanism for utilizing additional information and optimizing jobs, we propose the ProcessFunction Attribute. Each piece of additional information corresponds to a Java annotation. Users can annotate ProcessFunctions with this information, and the framework can recognize and utilize it for job optimization. Importantly, the job's regular execution is unaffected whether or not users utilize the ProcessFunction Attribute.
Proposed Changes
We will introduce the changes in the following steps:
1. We'll present the three new annotations in the ProcessFunction Attribute, including their definitions, potential framework optimization behaviors, and example job code.
2. We'll explain the implementation design of the ProcessFunction Attribute, including the parsing and usage processes of ProcessFunction.
ProcessFunction Attributes
1.@NoOutputUntilEndOfInput
Definition:
/**
* {@link NoOutputUntilEndOfInput} indicates that the process function will only output
* records after all inputs are ended. If this annotation is applied to a process function
* with an unbounded source, a compilation error will occur.
*/
@Experimental
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NoOutputUntilEndOfInput {}
This annotation indicates that the ProcessFunction only outputs results after all inputs have been ended (both for single and multiple inputs). During the job compilation phase, the framework checks if the data source of the annotated ProcessFunction is bounded. If unbounded, an exception is thrown.
Optimization Behavior:
The framework may schedule downstream operators only after the ProcessFunction completes.
Job Code Example:
A SortPartitionFunction collects and sorts data, outputting the sorted data only after all input is received. Annotating it with @NoOutputUntilEndOfInput allows the framework to schedule downstream operators after the SortPartitionFunction finishes.
/**
* The {@link SortPartitionFunction} is used to sort all received records.
*/
@NoOutputUntilEndOfInput
public class SortPartitionFunction implements OneInputStreamProcessFunction<Integer, Integer>{
static final StateDeclaration.ListStateDeclaration<Integer> LIST_STATE_DECLARATION =
StateDeclarations.listState("numbers", Types.INT);
@Override
void processRecord(Integer record, Collector<Integer> output, RuntimeContext ctx){
Optional<ListState<Long>> state = ctx.getStateManager().getState(LIST_STATE_DECLARATION);
state.get().addAll(Collections.singletonList(record));
}
@Override
void endInput(NonPartitionedContext<Integer> ctx){
Iterator<Integer> iterator = ctx.getStateManager().getState(LIST_STATE_DECLARATION).get()
.iterator();
Queue<Integer> numbers = new PriorityQueue<>();
while(iterator.hasNext()){
numbers.add(iterator.next());
}
ctx.applyToAllPartitions(
(collector, context) -> {
while(!numbers.isEmpty()){
collector.collect(numbers.poll());
}
});
}
@Override
public Set<StateDeclaration> usesStates() {
return Collections.singleton(LIST_STATE_DECLARATION);
}
}
2.@Idempotence
Definition:
/**
* {@link Idempotence} indicates that each new output from the ProcessFunction represents the
* final result at the time of output.
*/
@Experimental
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Idempotence {}
This annotation signifies that each new output from the ProcessFunction represents the final result at the time of output.
Optimization Behavior:
The framework can adjust the frequency or number of outputs from the ProcessFunction, reducing shuffle data and downstream operator workload, thereby improving performance.
Job Code Example:
A Counter counts the total number of received data points, with each output representing the count at the time of output. Annotating it with @Idempotence allows the framework to adjust the output frequency or number.
/**
* The {@link Counter} is used to count the total number of received records.
*/
@Idempotence
public class Counter implements OneInputStreamProcessFunction<Integer, Integer>{
static final StateDeclaration.ValueStateDeclaration<Integer> VALUE_STATE_DECLARATION =
StateDeclarations.valueState("count", Types.INT);
void processRecord(Integer record, Collector<Integer> output, RuntimeContext ctx){
ValueState<Integer> state = ctx.getState(VALUE_STATE_DECLARATION).get();
int count = state.value();
state.update(count + 1);
output.collect(state.value());
}
public Set<StateDeclaration> usesStates() {
return Collections.singleton(VALUE_STATE_DECLARATION);
}
}
3.@NotCacheRecord
Definition:
/**
* The {@link NotCacheRecord} indicates that the process function doesn't cache
* any input and output data.
*/
@Experimental
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NotCacheRecord {}
This annotation indicates that the ProcessFunction does not cache any input or output data.
Optimization Behavior:
In an OperatorChain, the framework can perform object reuse for data transferred between adjacent ProcessFunctions if they don't cache data, improving performance.
Job Code Example:
An Order object represents a product order's price, and a LargestOrder function calculates the current highest-priced order, caching the output order data. CurrencyConverter and OrderInfoEmitter convert the order amount and output order information, respectively, without caching any data. Annotating CurrencyConverter and OrderInfoEmitter with @NotCacheRecord enables object reuse for data between them.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromSource(someSource)
.process(new LargestOrder())
.process(new CurrencyConverter())
.process(new OrderInfoEmitter());
public class Order {
public double price;
// Constructor, Getter, Setter
...
}
public class LargestOrder implements OneInputStreamProcessFunction<Order, Order>{
Order largestOrder = null;
void processRecord(Order order, Collector<Order> output, RuntimeContext ctx){
if(largestOrder == null || order.getPrice() > largestOrder.getPrice()){
largestOrder = order;
output.collect(order);
}
}
}
@NotCacheRecord
public class CurrencyConverter implements OneInputStreamProcessFunction<Order, Order>{
void processRecord(Order order, Collector<Order> output, RuntimeContext ctx){
order.setPrice(order.getPrice() / 10000);
output.collect(order);
}
}
@NotCacheRecord
public class OrderInfoEmitter implements OneInputStreamProcessFunction<Order, String>{
void processRecord(Order order, Collector<String> output, RuntimeContext ctx){
output.collect("Largest Order: " + order.getPrice());
}
}
Next, we will introduce the underlying working principles of ProcessFunction Attribute mechanism.
The ProcessFunction Attribute mechanism consists of three main stages: the parsing stage, the adjustment stage, and the usage stage, as illustrated in the following diagram:
Parsing Stage
This phase occurs during the conversion from ProcessFunction to StreamGraph. Its purpose is to convert annotations on the ProcessFunction into parsing results containing the additional information and provide them to the framework. Since both JobGraph and StreamTask require the additional information for optimization, parsing is done during StreamGraph generation to make it accessible to both. This stage includes the following steps:
1. Transformation and StreamNode introduce an Attribute type instance attribute with corresponding getter and setter methods. The Attribute holds the additional information from the ProcessFunction.
2. During the conversion from ProcessFunction to Transformation, Java annotations are converted to an Attribute and set to the Transformation's instance attribute.
3. During the conversion from Transformation to StreamNode, the Attribute from the Transformation is set to the StreamNode's instance attribute.
Adjustment Stage
The adjustment stage occurs during the conversion of the StreamGraph to the JobGraph. Since a Chain node in the JobGraph can contain multiple StreamNodes, and one StreamNode's attributes might affect others, we need to readjust the attributes of each StreamNode within the Chain node.
As shown in Figure (1), the gray marking indicates that the attributes of StreamNode2 contain information corresponding to @NoOutputUntilEndOfInput. This means StreamNode2 only outputs data after all input has been received, allowing JobVertex3 to only start after JobVertex1 finishes execution. Since StreamNode3 directly follows StreamNode2 and has only one upstream node, it will also only output data at the end of input. Therefore, JobVertex4 can also start execution after JobVertex1 completes. However, StreamNode1 is not downstream of StreamNode2, so JobVertex2 remains unaffected.
For each Chain, we readjust the attributes of its StreamNodes, generating the final attributes for each. As shown in Figure (2), we adjust StreamNode3 so its attributes also include the information corresponding to @NoOutputUntilEndOfInput.
Usage Stage
This phase occurs within JobGraph and StreamTask, using the parsing results to optimize the job. Here are examples based on the three ProcessFunction Attributes:
1. @NoOutputUntilEndOfInput: JobGraph can optimize the scheduling behavior of downstream nodes for the corresponding ProcessFunction.
2. @Idempotence: StreamTask can optimize the number of outputs or the frequency of output for the StreamOperator containing the corresponding ProcessFunction.
3. @NotCacheRecord: StreamTask can perform object reuse for data output by the corresponding ProcessFunction within the OperatorChain.
Public Interfaces
We will add three new Java annotations, which have been explained and illustrated in the previous sections:
1.@NoOutputUntilEndOfInput
2.@Idempotence
3.@NotCacheRecord
Compatibility, Deprecation, and Migration Plan
1. The contents described in this FLIP are all new APIs and do not involve compatibility issues.
2.The proposed public interfaces in this FLIP will be annotated by @Experimental.
Test Plan
We will provide unit and integration tests to validate the proposed changes.

