Status


Discussion threadhttps://lists.apache.org/thread/j8xg2mjpy7wxqbcfspbwm0zdx7d1dvkc
Vote thread
JIRA

Release


Motivation

The default behavior of the streaming runtime is to copy every element between chained operators.

That operation was introduced for “safety” reasons, to avoid the cases where users can create incorrect programs by reusing mutable objects (a discouraged pattern, but possible). For example when using state backends that keep the state as objects on heap, reusing mutable objects can theoretically create cases where the same object is used in multiple state mappings.

The effect of that "safety" mechanism is that many people that try or use Flink get much lower performance than they could possibly get. From empirical evidence, almost all users that I (Stephan) have been in touch with eventually run into this issue.


There are multiple observations about that design:

Public Interfaces

Interfaces changed

The interface of the ExecutionConfig add the method setObjectReuseMode(ObjectReuseMode), and deprecates the methods enableObjectReuse() and disableObjectReuse().


Behavior changed

The default object passing behavior changes, meaning that it can affect the correctness of prior DataStream programs that assume the original “COPY_PER_OPERATOR” behavior (see below).


Proposed Changes

Summary

I propose to change the default behavior of the DataStream runtime to be the same as the DataSet runtime. That means that new objects are chosen on every deserialization, and no copies are made as the objects are passed on along the pipelines.


Details

I propose to drop the execution config flag objectReuse and instead introduce an ObjectReuseMode enumeration with better control of what should happen. There will be three different types:




An illustration of the modes is as follows:


DEFAULT

regular_mode.png


COPY_PER_OPERATOR

copy_mode.png


FULL_REUSE

reuse_mode.png



Compatibility, Deprecation, and Migration Plan

Interfaces

No interface migration path is needed, because the interfaces are not broken, merely some methods get deprecated.


Behavior change

We have two


Variant 1:

Variant 2:


Rejected Alternatives

None so far...