Status
Motivation
As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. To understand what we mean by this we have to elaborate a bit.
We refer to a data source as unbounded if it will continuously produce data and will never shut down. A bounded source, on the other hand, will only read a finite amount of data and will eventually shut down. A Flink job/program that includes unbounded source will be unbounded while a job that only contains bounded sources will be bounded, it will eventually finish. Traditionally, processing systems have been either optimized for bounded execution or unbounded execution, they are either a batch processor or a stream processor. The reason is that a framework can use different runtime algorithms and data structures depending on the nature of computation. Stream processors are optimized for continuous/incremental computation and can provide results fast (with low latency) while batch processors are optimized for processing a whole batch of data quickly while updates that are caused by single events/records are not provided with low latency. Flink can be used for both batch and stream processing but users need to use the DataSet API for the former and the DataStream API for the latter.
Users can use the DataStream API to write bounded programs but, currently, the runtime will not know that a program is bounded and will not take advantage of this when "deciding" how the program should be executed. This includes the choice of how operations are scheduled and how data is shuffled between operations.
Proposed Changes
We propose to add a new BATCH execution mode for the DataStream API that will allow users to write bounded programs that are executed in an efficient manner that takes advantage of the bounded nature of the program. The runtime execution semantics should be comparable to those of the DataSet API for equivalent programs. The current execution mode should be retroactively called STREAMING execution mode.
Some of the operations and semantics of the DataStream API will not work well for batch execution. We discuss these below and propose semantics and behaviour changes that we will need for batch execution with the DataStream API. Below, we will also discuss the details of the proposal and give a summary of the proposed semantics at the end.
BATCH vs. STREAMING and BOUNDED vs. UNBOUNDED
In this proposal we sometimes use the terms BATCH and STREAMING and sometimes BOUNDED and UNBOUNDED. The terms might seem mostly interchangeable but we think there is a subtle difference and use them for slightly different purposes. In the API/SDK we should use the terms BOUNDED and UNBOUNDED, a source can be bounded or unbounded, a Flink job can be bounded or unbounded. For the runtime behaviour we suggest to use BATCH and STREAMING. The main reason for this is that it can make sense to execute bounded programs with different execution semantics, i.e. it can make sense to execute a bounded program in STREAMING mode if a user wants to get the execution semantics of the DataStream API as they were before this proposal.
Configuring the new execution mode
We propose to introduce a new setting called execution.runtime-mode that has three possible values:
- BATCH: Select the new batch runtime mode. This is only valid if all sources are bounded.
- STREAMING: Select streaming runtime mode. This is the execution behaviour that the DataStream API currently exhibits and we retroactively will refer to it as streaming mode after this FLIP comes effective.
- AUTOMATIC: Choose the execution mode based on the sources in the job/program. If all sources are bounded choose BATCH, otherwise choose STREAMING.
Ideally, the default setting would be AUTOMATIC but we think that before Flink 2.0 the default should be STREAMING so that we don't break existing setups: Some already existing sources in the DataStream API are bounded, e.g. fromCollection() or readFile() with FileProcessingMode set to PROCESS_ONCE so having AUTOMATIC as a default would switch on BATCH execution for them, with potentially surprising behaviour.
The new setting will be configurable via the configuration but we also propose to introduce programmatic API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
Task scheduling and shuffle mode in BATCH vs. STREAMING mode
The user code of an application in Flink is translated into a graph of tasks similar to the one shown in here. Flink’s runtime will then schedule these tasks differently depending on the configured ScheduleMode. Another important decision is the data exchange mode. This controls how data is "shuffled" between the different tasks, this is primarily important for operations that are connected by a keyBy() connection.
In STREAMING mode, all tasks should be scheduled before any data can flow through the pipeline. Records are processed as they come and the output of each task is aggressively pushed to the next task downstream with almost no buffering in between. Mechanisms like Checkpointing build on the fact that all tasks are available throughout the lifecycle of a job in order to provide fault tolerance. Internally, the schedule mode would be EAGER and the shuffle mode would be PIPELINED.
In BATCH mode, the task graph is split into independent regions which either run independently, e.g. “embarrassingly parallel” jobs, or communicate with blocking shuffles, e.g. pipelines containing keyBy(). Each of these regions processes its (bounded) input from start to finish and materialises its output. The lifecycle of that output is independent of that of the task that produced it. This allows for more flexible scheduling where the task can be de-allocated and the freed resources can be allocated to the next ready task. In addition, if a task fails, its predecessors do not necessarily need to be re-executed as their output is already materialised. For more details please see FLIP-119.
Each of the above scheduling algorithms has benefits over the other for its particular setting, but each has its own implications about fault-tolerance and other operational topics such as availability of savepoints.
We also want to make the shuffle mode configurable because there might be cases where a user wants BATCH execution mode but still wants the data to be exchanged in a pipelined fashion. For this we propose a new configuration option execution.shuffle-mode
. We will not expose a dedicated setter in StreamExecutionEnvironment, or ExecutionConfig
Example Usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
Configuration conf = new Configuration();
conf.setString("execution.shuffle-mode", "ALL_EDGES_BLOCKING");
env.configure(conf);
Processing Time Support in BATCH execution mode
The notion of time is crucial in Stream Processing. Time in Streaming comes in two flavours, Processing and Event Time. In a nutshell, Processing Time is the wall-clock time on the machine the record is processed, at the specific instance it is processed, while Event Time is a timestamp usually embedded in the record itself and indicating, for example, when a transaction happened, or when a measurement was taken by a sensor. For more details, please refer to the corresponding documentation page.
Based on the definition of Processing Time, we see that the results of a computation that is based on processing time are not reproducible. This is because the same record processed twice will have two different timestamps.
Despite the above, in streaming, using processing time can be useful. The reason has to do with the fact that streaming pipelines often ingest their unbounded input in “real time” and jobs are expected to run for a long period of time. As an example, imagine a computation that counts incoming records of a certain type in 1h (event time) windows. In this example, the user may need to wait for ~1h to have the result for a window, as data is ingested as fast as they come (real-time). In this case, to have an early (incomplete) indication of the trend of the counter (is it increasing or decreasing), the user may want to use “early firings” every 5 min in processing time.
In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.
We propose to introduce two new configuration options:
- pipeline.processing-time.allow:
- ALLOW: allow API calls to get the current processing time and register timers
IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()- FAIL: fail with an exception when API methods that deal with processing time are called
- pipeline.processing-time.end-of-input:
- FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
- IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste
These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:
- STREAMING
- pipeline.processing-time.allow: ALLOW
- pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
- BATCH
- pipeline.processing-time.allow: FAIL
- pipeline.processing-time.end-of-input: IGNORE
We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results.
The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also - FLINK-18647Getting issue details... STATUS .
Event Time Support in BATCH execution mode
Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses a heuristic called Watermarks. A watermark with timestamp T signals that no element with timestamp t < T will follow.
In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.
Implications: Given the above, in BATCH mode, we only need to send a MAX_WATERMARK when the end of input is reached or at the end of each key if we decide to process each key independently, as done in DataSet. This will allow all registered timers to fire. This means that user-defined WatermarkAssigners will be ignored.
Although the above does not directly imply any user-visible change, it has to be stressed out as in some cases, the same application executed on batch and streaming may lead to different results due to late elements being left out in case of streaming.
Making EventTime the new default StreamTimeCharacteristic
As described above, event time is the only sensible time characteristic for batch. We therefore propose to chagne the default value of the StreamTimeCharacteristic from ProcessingTime to EventTime. This means the DataStream API programs that were using event time before now just work without manually changing this setting. Processing-time programs will also still work, because using processing-time timers is not dependent on the StreamTimeCharacteristic. DataStream programs that don't set a TimestampAssigner or WatermarkStrategy will also still work if they don't use operations that don't rely on (event-time) timestamps. This is true for both BATCH and STREAMING execution mode.
The only real user-visible change of this is that programs that used the KeyedStream.timeWindow()/DataStream.timeWindow() operation, which is dependent on the StreamTimeCharacteristic will now use event time by default. We don't think this operation is useful because the behaviour can be surprising. We recommend users always use an explicit processing-time window or event-time window.
Deprecating timeWindow() and related methods
As describe above, we think timeWindow() is not a useful operation and therefore propose to deprecate and eventually remove it. The operation can have surprising behaviour and users should use explicit process-time or event-time operations.
Incremental updates vs. "final" updates in BATCH vs. STREAM execution mode
Some of the operations on DataStream have semantics that might make sense for stream processing but should behave differently in BATCH execution mode. For example, KeyedStream.reduce() is essentially a reduce on a GlobalWindow with a Trigger that fires on every element. In data base terms it produces an UPSERT stream as an output: if you get ten input elements for a key you also get ten output records. For batch processing, it makes more sense to instead only produce one output record per key with the result of the aggregation when we reach the end of stream/key. This will be correct for downstream consumers that expect an UPSERT stream but it will change the actual physical output stream that they see. We therefore suggest to change the behaviour of these methods to only emit a final result at the end of input:
- KeyedStream#reduce
- KeyedStream#sum,min,max,minBy,maxBy
- KeyedStream#fold
Semantically, you can think of the BATCH mode as enabling a virtual execution.incremental-updates = false setting.
Deprecating Relational methods on DataStream
As discussed in FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we see the Table API/SQL as the relational API, where we expect users to work with schemas and fields. Going forward, we envision the DataStream API to be "slightly" lower level API, with more explicit control over the execution graph, operations, and state. Having said that we think it is worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing explicit extractors such as:
- DataStream#project
- Windowed/KeyedStream#sum,min,max,minBy,maxBy
- DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)
Sinks
Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work well in BATCH execution mode. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.
Iterations
Iterations are outside the scope of this FLIP and there will be a separate one in the future.
Summary of the Proposed Changes
We want to introduce a new setting execution.runtime-mode, that controls runtime execution behaviour. The different execution modes semantically set a couple of real or virtual configuration options:
- STREAMING:
- execution.schedule-mode = EAGER
- execution.shuffle-mode = ALL_EDGES_PIPELINED
- execution.incremental-updates = true
- BATCH:
- execution.schedule-mode = LAZY_FROM_SOURCSE
- execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
- execution.incremental-updates = false
We propose to introduce two new settings pipeline.processing-time.allow and pipeline.processing-time.end-of-input with these defaults:
- STREAMING
- pipeline.processing-time.allow: ALLOW
- pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
- BATCH
- pipeline.processing-time.allow: FAIL
- pipeline.processing-time.end-of-input: IGNORE
We want to make EventTime the new default StreamTimeCharacteristic.
We want to deprecate and eventually remove timeWindow() and related operations.
We want to deprecate and eventually remove relational methods from the DataStream API because we think they belong in the Table API going forward.
Compatibility, Deprecation, and Migration Plan
Existing users of the DataStream API will not be affected because we propose to keep STREAMING as the default for the new execution.runtime-mode setting. Users of the DataSet API should consider eventually moving their applications to the DataStream API with the new BATCH execution mode.
Test Plan
The new code should, of course, be tested by unit tests and integration tests as well as end-to-end tests. The most important acceptance test is whether we can execute a bounded DataStream program with multiple stages (keyBy()) on a single TaskManager with a single task slot. This is something that is not possible in STREAMING mode because all tasks/operations need to be online at the same time but is something that will be possible in BATCH execution mode.
Rejected Alternatives
We did not come up with good alternatives to this if we assume that we want to go forward with removing the DataSet API.